This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 358f33a27f IGNITE-17607 C++ Compute API (#1820)
358f33a27f is described below
commit 358f33a27fc3d97e2a5a7db122fb8b72627619a1
Author: Igor Sapego <[email protected]>
AuthorDate: Mon Mar 20 23:48:21 2023 +0400
IGNITE-17607 C++ Compute API (#1820)
---
.../cluster/ClientClusterGetNodesRequest.java | 2 +
.../ignite/internal/client/TcpIgniteClient.java | 3 +
modules/platforms/cpp/CMakeLists.txt | 2 +-
modules/platforms/cpp/cmake/ignite_test.cmake | 2 +-
modules/platforms/cpp/ignite/client/CMakeLists.txt | 5 +
.../cpp/ignite/client/compute/compute.cpp | 93 +++++++
.../platforms/cpp/ignite/client/compute/compute.h | 144 ++++++++++
.../ignite/client/detail/argument_check_utils.h | 65 +++++
.../cpp/ignite/client/detail/client_operation.h | 9 +
.../ignite/client/detail/compute/compute_impl.cpp | 127 +++++++++
.../ignite/client/detail/compute/compute_impl.h | 79 ++++++
.../ignite/client/detail/ignite_client_impl.cpp | 49 ++++
.../cpp/ignite/client/detail/ignite_client_impl.h | 20 ++
.../cpp/ignite/client/detail/node_connection.cpp | 2 +-
.../cpp/ignite/client/detail/table/table_impl.cpp | 215 +-------------
.../cpp/ignite/client/detail/table/table_impl.h | 19 ++
.../platforms/cpp/ignite/client/detail/utils.cpp | 296 ++++++++++++++++++--
modules/platforms/cpp/ignite/client/detail/utils.h | 23 ++
.../platforms/cpp/ignite/client/ignite_client.cpp | 13 +
.../platforms/cpp/ignite/client/ignite_client.h | 25 ++
.../end_point.h => client/network/cluster_node.h} | 89 +++---
modules/platforms/cpp/ignite/client/primitive.h | 27 +-
.../platforms/cpp/ignite/client/primitive_test.cpp | 17 +-
.../cpp/ignite/client/table/key_value_view.cpp | 59 ++--
.../cpp/ignite/client/table/record_view.cpp | 32 +--
modules/platforms/cpp/ignite/client/table/table.h | 1 +
modules/platforms/cpp/ignite/common/CMakeLists.txt | 1 +
.../cpp/ignite/{network => common}/end_point.h | 2 +-
.../platforms/cpp/ignite/network/async_handler.h | 2 +-
.../network/detail/linux/connecting_context.h | 7 +-
.../network/detail/linux/linux_async_client.h | 8 +-
.../detail/linux/linux_async_worker_thread.h | 11 +-
.../ignite/network/detail/win/win_async_client.h | 8 +-
.../platforms/cpp/tests/client-test/CMakeLists.txt | 1 +
.../cpp/tests/client-test/compute_test.cpp | 308 +++++++++++++++++++++
.../cpp/tests/client-test/ignite_runner_suite.h | 11 +
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 4 +
37 files changed, 1438 insertions(+), 343 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
index 20032f4fa5..4d3d137083 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
@@ -42,6 +42,8 @@ public class ClientClusterGetNodesRequest {
out.packArrayHeader(nodes.size());
for (ClusterNode node : nodes) {
+ out.packArrayHeader(4);
+
out.packString(node.id());
out.packString(node.name());
out.packString(node.address().host());
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 8ab6beaafa..e5b644b250 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -160,6 +160,9 @@ public class TcpIgniteClient implements IgniteClient {
List<ClusterNode> res = new ArrayList<>(cnt);
for (int i = 0; i < cnt; i++) {
+ int fieldCnt = r.in().unpackArrayHeader();
+ assert fieldCnt == 4;
+
res.add(new ClusterNode(
r.in().unpackString(),
r.in().unpackString(),
diff --git a/modules/platforms/cpp/CMakeLists.txt
b/modules/platforms/cpp/CMakeLists.txt
index ca7bc3c401..a12077763f 100644
--- a/modules/platforms/cpp/CMakeLists.txt
+++ b/modules/platforms/cpp/CMakeLists.txt
@@ -15,7 +15,7 @@
# limitations under the License.
#
-cmake_minimum_required(VERSION 3.10)
+cmake_minimum_required(VERSION 3.18)
project(Ignite.C++ VERSION 3 LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
diff --git a/modules/platforms/cpp/cmake/ignite_test.cmake
b/modules/platforms/cpp/cmake/ignite_test.cmake
index d80e604c7f..108fcf704e 100644
--- a/modules/platforms/cpp/cmake/ignite_test.cmake
+++ b/modules/platforms/cpp/cmake/ignite_test.cmake
@@ -38,5 +38,5 @@ function(ignite_test TEST_NAME TEST_SOURCE)
target_link_libraries(${TEST_NAME} ${IGNITE_TEST_LIBS} GTest::GTest
GTest::Main)
endif()
- gtest_discover_tests(${TEST_NAME})
+ gtest_discover_tests(${TEST_NAME} XML_OUTPUT_DIR
${CMAKE_BINARY_DIR}/Testing/Result)
endfunction()
diff --git a/modules/platforms/cpp/ignite/client/CMakeLists.txt
b/modules/platforms/cpp/ignite/client/CMakeLists.txt
index 53bf6486d8..e77c9f1f0f 100644
--- a/modules/platforms/cpp/ignite/client/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/client/CMakeLists.txt
@@ -21,6 +21,7 @@ set(TARGET ${PROJECT_NAME})
set(SOURCES
ignite_client.cpp
+ compute/compute.cpp
sql/sql.cpp
sql/result_set.cpp
table/key_value_view.cpp
@@ -30,8 +31,10 @@ set(SOURCES
transaction/transaction.cpp
transaction/transactions.cpp
detail/cluster_connection.cpp
+ detail/ignite_client_impl.cpp
detail/utils.cpp
detail/node_connection.cpp
+ detail/compute/compute_impl.cpp
detail/sql/sql_impl.cpp
detail/table/table_impl.cpp
detail/table/tables_impl.cpp
@@ -41,6 +44,8 @@ set(PUBLIC_HEADERS
ignite_client.h
ignite_client_configuration.h
ignite_logger.h
+ compute/compute.h
+ network/cluster_node.h
sql/sql.h
table/ignite_tuple.h
table/key_value_view.h
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp
b/modules/platforms/cpp/ignite/client/compute/compute.cpp
new file mode 100644
index 0000000000..54ccd519d8
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/client/compute/compute.h"
+#include "ignite/client/detail/argument_check_utils.h"
+#include "ignite/client/detail/compute/compute_impl.h"
+
+#include <random>
+
+namespace ignite {
+
+template<typename T>
+typename T::value_type get_random_element(const T &cont) {
+ static std::mutex randomMutex;
+ static std::random_device rd;
+ static std::mt19937 gen(rd());
+
+ assert(!cont.empty());
+
+ std::uniform_int_distribution<size_t> distrib(0, cont.size() - 1);
+
+ std::lock_guard<std::mutex> lock(randomMutex);
+
+ return cont[distrib(gen)];
+}
+
+void compute::execute_async(const std::vector<cluster_node> &nodes,
std::string_view job_class_name,
+ const std::vector<primitive> &args,
ignite_callback<std::optional<primitive>> callback) {
+ detail::arg_check::container_non_empty(nodes, "Nodes container");
+ detail::arg_check::container_non_empty(job_class_name, "Job class name");
+
+ m_impl->execute_on_one_node(get_random_element(nodes), job_class_name,
args, std::move(callback));
+}
+
+void compute::broadcast_async(const std::set<cluster_node> &nodes,
std::string_view job_class_name,
+ const std::vector<primitive> &args,
+ ignite_callback<std::map<cluster_node,
ignite_result<std::optional<primitive>>>> callback) {
+ typedef std::map<cluster_node, ignite_result<std::optional<primitive>>>
result_type;
+
+ detail::arg_check::container_non_empty(nodes, "Nodes set");
+ detail::arg_check::container_non_empty(job_class_name, "Job class name");
+
+ struct result_group {
+ explicit result_group(std::int32_t cnt, ignite_callback<result_type>
&&cb)
+ : m_cnt(cnt)
+ , m_callback(cb) {}
+
+ std::mutex m_mutex;
+ result_type m_res_map;
+ std::int32_t m_cnt{0};
+ ignite_callback<result_type> m_callback;
+ };
+
+ auto shared_res =
std::make_shared<result_group>(std::int32_t(nodes.size()), std::move(callback));
+
+ for (const auto &node : nodes) {
+ m_impl->execute_on_one_node(node, job_class_name, args, [node,
shared_res](auto &&res) {
+ auto &val = *shared_res;
+
+ std::lock_guard<std::mutex> lock(val.m_mutex);
+ val.m_res_map.emplace(node, res);
+ --val.m_cnt;
+ if (val.m_cnt == 0)
+ val.m_callback(std::move(val.m_res_map));
+ });
+ }
+}
+
+void compute::execute_colocated_async(std::string_view table_name, const
ignite_tuple &key,
+ std::string_view job_class_name, const std::vector<primitive> &args,
+ ignite_callback<std::optional<primitive>> callback) {
+ detail::arg_check::container_non_empty(table_name, "Table name");
+ detail::arg_check::tuple_non_empty(key, "Key tuple");
+ detail::arg_check::container_non_empty(job_class_name, "Job class name");
+
+ m_impl->execute_colocated_async(table_name, key, job_class_name, args,
std::move(callback));
+}
+
+} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.h
b/modules/platforms/cpp/ignite/client/compute/compute.h
new file mode 100644
index 0000000000..a75e7fa559
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/compute/compute.h
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/client/network/cluster_node.h"
+#include "ignite/client/primitive.h"
+#include "ignite/client/table/ignite_tuple.h"
+#include "ignite/client/transaction/transaction.h"
+#include "ignite/common/config.h"
+#include "ignite/common/ignite_result.h"
+
+#include <map>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+namespace ignite {
+
+namespace detail {
+class compute_impl;
+}
+
+/**
+ * Ignite Compute facade.
+ */
+class compute {
+ friend class ignite_client;
+
+public:
+ // Delete
+ compute() = delete;
+
+ /**
+ * Executes a compute job represented by the given class on one of the
specified nodes asynchronously.
+ *
+ * @param nodes Nodes to use for the job execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @param callback A callback called on operation completion with job
execution result.
+ */
+ IGNITE_API void execute_async(const std::vector<cluster_node> &nodes,
std::string_view job_class_name,
+ const std::vector<primitive> &args,
ignite_callback<std::optional<primitive>> callback);
+
+ /**
+ * Executes a compute job represented by the given class on one of the
specified nodes.
+ *
+ * @param nodes Nodes to use for the job execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @return Job execution result.
+ */
+ IGNITE_API std::optional<primitive> execute(
+ const std::vector<cluster_node> &nodes, std::string_view
job_class_name, const std::vector<primitive> &args) {
+ return sync<std::optional<primitive>>([this, nodes, job_class_name,
args](auto callback) mutable {
+ execute_async(nodes, job_class_name, args, std::move(callback));
+ });
+ }
+
+ /**
+ * Executes a compute job represented by the given class on all of the
specified nodes asynchronously.
+ *
+ * @param nodes Nodes to use for the job execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @param callback A callback called on operation completion with jobs
execution result.
+ */
+ IGNITE_API void broadcast_async(const std::set<cluster_node> &nodes,
std::string_view job_class_name,
+ const std::vector<primitive> &args,
+ ignite_callback<std::map<cluster_node,
ignite_result<std::optional<primitive>>>> callback);
+
+ /**
+ * Executes a compute job represented by the given class on one of the
specified nodes.
+ *
+ * @param nodes Nodes to use for the job execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @return Job execution result.
+ */
+ IGNITE_API std::map<cluster_node, ignite_result<std::optional<primitive>>>
broadcast(
+ const std::set<cluster_node> &nodes, std::string_view job_class_name,
const std::vector<primitive> &args) {
+ return sync<std::map<cluster_node,
ignite_result<std::optional<primitive>>>>(
+ [this, nodes, job_class_name, args](
+ auto callback) mutable { broadcast_async(nodes,
job_class_name, args, std::move(callback)); });
+ }
+
+ /**
+ * Asynchronously executes a job represented by the given class on one
node where the given key is located.
+ *
+ * @param tableName Name of the table to be used with @c key to determine
target node.
+ * @param key Table key to be used to determine the target node for job
execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @param callback A callback called on operation completion with job
execution result.
+ */
+ IGNITE_API void execute_colocated_async(std::string_view table_name, const
ignite_tuple &key,
+ std::string_view job_class_name, const std::vector<primitive> &args,
+ ignite_callback<std::optional<primitive>> callback);
+
+ /**
+ * Synchronously executes a job represented by the given class on one node
where the given key is located.
+ *
+ * @param tableName Name of the table to be used with @c key to determine
target node.
+ * @param key Table key to be used to determine the target node for job
execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @return Job execution result.
+ */
+ IGNITE_API std::optional<primitive> execute_colocated(std::string_view
table_name, const ignite_tuple &key,
+ std::string_view job_class_name, const std::vector<primitive> &args) {
+ return sync<std::optional<primitive>>([this, &table_name, &key,
job_class_name, &args](auto callback) mutable {
+ execute_colocated_async(table_name, key, job_class_name, args,
std::move(callback));
+ });
+ }
+
+private:
+ /**
+ * Constructor
+ *
+ * @param impl Implementation
+ */
+ explicit compute(std::shared_ptr<detail::compute_impl> impl)
+ : m_impl(std::move(impl)) {}
+
+ /** Implementation. */
+ std::shared_ptr<detail::compute_impl> m_impl;
+};
+
+} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/client/detail/argument_check_utils.h
b/modules/platforms/cpp/ignite/client/detail/argument_check_utils.h
new file mode 100644
index 0000000000..8c159ccfe7
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/detail/argument_check_utils.h
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/client/table/ignite_tuple.h"
+
+#include <string>
+
+namespace ignite::detail::arg_check {
+
+/**
+ * Check key argument.
+ *
+ * @param key Key tuple.
+ */
+void inline tuple_non_empty(const ignite_tuple &value, const std::string
&title) {
+ if (0 == value.column_count())
+ throw ignite_error(title + " can not be empty");
+}
+
+/**
+ * Check key argument.
+ *
+ * @param key Key tuple.
+ */
+void inline key_tuple_non_empty(const ignite_tuple &key) {
+ tuple_non_empty(key, "Key tuple");
+}
+
+/**
+ * Check value argument.
+ *
+ * @param value Value tuple.
+ */
+void inline value_tuple_non_empty(const ignite_tuple &value) {
+ tuple_non_empty(value, "Value tuple");
+}
+
+/**
+ * Check value argument.
+ *
+ * @param value Value tuple.
+ */
+template<typename T>
+void inline container_non_empty(const T &cont, const std::string &title) {
+ if (cont.empty())
+ throw ignite_error(title + " can not be empty");
+}
+
+} // namespace ignite::detail::arg_check
diff --git a/modules/platforms/cpp/ignite/client/detail/client_operation.h
b/modules/platforms/cpp/ignite/client/detail/client_operation.h
index 52295c774f..dc75b16387 100644
--- a/modules/platforms/cpp/ignite/client/detail/client_operation.h
+++ b/modules/platforms/cpp/ignite/client/detail/client_operation.h
@@ -89,6 +89,15 @@ enum class client_operation {
/** Rollback transaction. */
TX_ROLLBACK = 45,
+ /** Execute compute job. */
+ COMPUTE_EXECUTE = 47,
+
+ /** Get cluster nodes. */
+ CLUSTER_GET_NODES = 48,
+
+ /** Execute compute job. */
+ COMPUTE_EXECUTE_COLOCATED = 49,
+
/** Execute SQL query. */
SQL_EXEC = 50,
diff --git
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
new file mode 100644
index 0000000000..d3edf24873
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/client/detail/compute/compute_impl.h"
+#include "ignite/client/detail/utils.h"
+#include "ignite/schema/binary_tuple_builder.h"
+
+namespace ignite::detail {
+
+/**
+ * Write a collection of primitives as a binary tuple.
+ *
+ * @param writer Writer to use.
+ * @param args Arguments.
+ */
+void write_primitives_as_binary_tuple(protocol::writer &writer, const
std::vector<primitive> &args) {
+ auto args_num = std::int32_t(args.size());
+
+ writer.write(args_num);
+
+ binary_tuple_builder args_builder{args_num * 3};
+
+ args_builder.start();
+ for (const auto &arg : args) {
+ claim_primitive_with_type(args_builder, arg);
+ }
+
+ args_builder.layout();
+ for (const auto &arg : args) {
+ append_primitive_with_type(args_builder, arg);
+ }
+
+ auto args_data = args_builder.build();
+ writer.write_binary(args_data);
+}
+
+/**
+ * Read primitive from a stream, which is encoded as a binary tuple.
+ *
+ * @param reader Reader.
+ * @return Value.
+ */
+std::optional<primitive> read_primitive_from_binary_tuple(protocol::reader
&reader) {
+ auto tuple_data = reader.read_binary();
+ binary_tuple_parser parser(3, tuple_data);
+
+ auto typ =
ignite_type(binary_tuple_parser::get_int32(parser.get_next().value()));
+ auto scale = binary_tuple_parser::get_int32(parser.get_next().value());
+ return read_next_column(parser, typ, scale);
+}
+
+void compute_impl::execute_on_one_node(cluster_node node, std::string_view
job_class_name,
+ const std::vector<primitive> &args,
ignite_callback<std::optional<primitive>> callback) {
+
+ auto writer_func = [&node, job_class_name, args](protocol::writer &writer)
{
+ writer.write(node.get_name());
+ writer.write(job_class_name);
+ write_primitives_as_binary_tuple(writer, args);
+ };
+
+ auto reader_func = [](protocol::reader &reader) ->
std::optional<primitive> {
+ if (reader.try_read_nil())
+ return std::nullopt;
+
+ return read_primitive_from_binary_tuple(reader);
+ };
+
+ m_connection->perform_request<std::optional<primitive>>(
+ client_operation::COMPUTE_EXECUTE, writer_func,
std::move(reader_func), std::move(callback));
+}
+
+void compute_impl::execute_colocated_async(std::string_view table_name, const
ignite_tuple &key, std::string_view job,
+ const std::vector<primitive> &args,
ignite_callback<std::optional<primitive>> callback) {
+ m_tables->get_table_async(table_name,
+ [table_name = std::string(table_name), callback = std::move(callback),
key, job = std::string(job), args,
+ conn = m_connection](auto &&res) mutable {
+ if (res.has_error()) {
+ callback({std::move(res.error())});
+ return;
+ }
+ auto &table_opt = res.value();
+ if (!table_opt) {
+ callback({ignite_error("Table does not exist: '" + table_name
+ "'")});
+ return;
+ }
+
+ auto table = table_impl::from_facade(*table_opt);
+ table->template
with_latest_schema_async<std::optional<primitive>>(std::move(callback),
+ [table, key = std::move(key), job = std::move(job), args =
std::move(args),
+ conn] // NOLINT(performance-move-const-arg)
+ (const schema &sch, auto callback) mutable {
+ auto writer_func = [&key, &sch, &table, &job,
&args](protocol::writer &writer) {
+ writer.write(table->get_id());
+ writer.write(sch.version);
+ write_tuple(writer, sch, key, true);
+ writer.write(job);
+ write_primitives_as_binary_tuple(writer, args);
+ };
+
+ auto reader_func = [](protocol::reader &reader) ->
std::optional<primitive> {
+ if (reader.try_read_nil())
+ return std::nullopt;
+
+ return read_primitive_from_binary_tuple(reader);
+ };
+
+
conn->perform_request<std::optional<primitive>>(client_operation::COMPUTE_EXECUTE_COLOCATED,
+ writer_func, std::move(reader_func),
std::move(callback));
+ });
+ });
+}
+
+} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
new file mode 100644
index 0000000000..122be3c66f
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/client/detail/cluster_connection.h"
+#include "ignite/client/detail/table/tables_impl.h"
+#include "ignite/client/network/cluster_node.h"
+#include "ignite/client/primitive.h"
+#include "ignite/client/table/ignite_tuple.h"
+#include "ignite/common/ignite_result.h"
+
+#include <memory>
+#include <utility>
+
+namespace ignite::detail {
+
+/**
+ * Ignite Compute implementation.
+ */
+class compute_impl {
+ friend class ignite_client;
+
+public:
+ /**
+ * Constructor.
+ *
+ * @param connection Connection.
+ */
+ explicit compute_impl(std::shared_ptr<cluster_connection> connection,
std::shared_ptr<tables_impl> tables)
+ : m_connection(std::move(connection))
+ , m_tables(std::move(tables)) {}
+
+ /**
+ * Executes a compute job represented by the given class on the specified
node asynchronously.
+ *
+ * @param node Node to use for the job execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @param callback A callback called on operation completion with job
execution result.
+ */
+ void execute_on_one_node(cluster_node node, std::string_view
job_class_name, const std::vector<primitive> &args,
+ ignite_callback<std::optional<primitive>> callback);
+
+ /**
+ * Asynchronously executes a job represented by the given class on one
node where the given key is located.
+ *
+ * @param tableName Name of the table to be used with @c key to determine
target node.
+ * @param key Table key to be used to determine the target node for job
execution.
+ * @param job_class_name Java class name of the job to execute.
+ * @param args Job arguments.
+ * @param callback A callback called on operation completion with job
execution result.
+ */
+ void execute_colocated_async(std::string_view table_name, const
ignite_tuple &key, std::string_view job_class_name,
+ const std::vector<primitive> &args,
ignite_callback<std::optional<primitive>> callback);
+
+private:
+ /** Cluster connection. */
+ std::shared_ptr<cluster_connection> m_connection;
+
+ /** Tables. */
+ std::shared_ptr<tables_impl> m_tables;
+};
+
+} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
new file mode 100644
index 0000000000..9e1e324e0e
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/client/detail/ignite_client_impl.h"
+
+#include <ignite/protocol/utils.h>
+
+namespace ignite::detail {
+
+void
ignite_client_impl::get_cluster_nodes_async(ignite_callback<std::vector<cluster_node>>
callback) {
+ auto reader_func = [](protocol::reader &reader) ->
std::vector<cluster_node> {
+ std::vector<cluster_node> nodes;
+ nodes.reserve(reader.read_array_size());
+
+ reader.read_array_raw([&nodes](auto, const msgpack_object &object) {
+ auto fields = object.via.array;
+ assert(fields.size >= 4);
+
+ auto id = protocol::unpack_object<std::string>(fields.ptr[0]);
+ auto name = protocol::unpack_object<std::string>(fields.ptr[1]);
+ auto host = protocol::unpack_object<std::string>(fields.ptr[2]);
+ auto port = protocol::unpack_object<std::int32_t>(fields.ptr[3]);
+
+ nodes.emplace_back(
+ std::move(id), std::move(name),
network::end_point{std::move(host), std::uint16_t(port)});
+ });
+
+ return nodes;
+ };
+
+ m_connection->perform_request_rd<std::vector<cluster_node>>(
+ client_operation::CLUSTER_GET_NODES, std::move(reader_func),
std::move(callback));
+}
+
+} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.h
b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.h
index 0d23ce681e..65fe6a0a42 100644
--- a/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.h
@@ -18,6 +18,7 @@
#pragma once
#include <ignite/client/detail/cluster_connection.h>
+#include <ignite/client/detail/compute/compute_impl.h>
#include <ignite/client/detail/sql/sql_impl.h>
#include <ignite/client/detail/table/tables_impl.h>
#include <ignite/client/detail/transaction/transactions_impl.h>
@@ -52,6 +53,7 @@ public:
, m_connection(cluster_connection::create(m_configuration))
, m_tables(std::make_shared<tables_impl>(m_connection))
, m_sql(std::make_shared<sql_impl>(m_connection))
+ , m_compute(std::make_shared<compute_impl>(m_connection, m_tables))
, m_transactions(std::make_shared<transactions_impl>(m_connection)) {}
/**
@@ -93,6 +95,13 @@ public:
*/
[[nodiscard]] std::shared_ptr<sql_impl> get_sql_impl() const { return
m_sql; }
+ /**
+ * Get Compute management API implementation.
+ *
+ * @return Compute management API implementation.
+ */
+ [[nodiscard]] std::shared_ptr<compute_impl> get_compute_impl() const {
return m_compute; }
+
/**
* Get transactions management API implementation.
*
@@ -100,6 +109,14 @@ public:
*/
[[nodiscard]] std::shared_ptr<transactions_impl> get_transactions_impl()
const { return m_transactions; }
+ /**
+ * Gets the cluster nodes asynchronously.
+ * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+ *
+ * @param callback Callback called with the list of cluster nodes upon
success.
+ */
+ void get_cluster_nodes_async(ignite_callback<std::vector<cluster_node>>
callback);
+
private:
/** Configuration. */
const ignite_client_configuration m_configuration;
@@ -113,6 +130,9 @@ private:
/** SQL. */
std::shared_ptr<sql_impl> m_sql;
+ /** Compute. */
+ std::shared_ptr<compute_impl> m_compute;
+
/** Transactions. */
std::shared_ptr<transactions_impl> m_transactions;
};
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index 5811331b6e..a17c2b944f 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "node_connection.h"
+#include "ignite/client/detail/node_connection.h"
#include <ignite/protocol/utils.h>
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
index 221aa2238d..eb6b3a4db9 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.cpp
@@ -18,226 +18,17 @@
#include "ignite/client/detail/table/table_impl.h"
#include "ignite/client/detail/transaction/transaction_impl.h"
#include "ignite/client/detail/utils.h"
+#include "ignite/client/table/table.h"
#include "ignite/common/bits.h"
#include "ignite/common/ignite_error.h"
#include "ignite/protocol/bitset_span.h"
#include "ignite/protocol/reader.h"
#include "ignite/protocol/writer.h"
-#include "ignite/schema/binary_tuple_builder.h"
#include "ignite/schema/binary_tuple_parser.h"
namespace ignite::detail {
-/**
- * Claim space for the column.
- *
- * @param builder Binary tuple builder.
- * @param typ Column type.
- * @param value Value.
- * @param scale Column scale.
- */
-void claim_column(binary_tuple_builder &builder, ignite_type typ, const
primitive &value, std::int32_t scale) {
- switch (typ) {
- case ignite_type::INT8:
- builder.claim_int8(value.get<std::int8_t>());
- break;
- case ignite_type::INT16:
- builder.claim_int16(value.get<std::int16_t>());
- break;
- case ignite_type::INT32:
- builder.claim_int32(value.get<std::int32_t>());
- break;
- case ignite_type::INT64:
- builder.claim_int64(value.get<std::int64_t>());
- break;
- case ignite_type::FLOAT:
- builder.claim_float(value.get<float>());
- break;
- case ignite_type::DOUBLE:
- builder.claim_double(value.get<double>());
- break;
- case ignite_type::UUID:
- builder.claim_uuid(value.get<uuid>());
- break;
- case ignite_type::STRING:
- builder.claim_string(value.get<std::string>());
- break;
- case ignite_type::BINARY:
- builder.claim_bytes(value.get<std::vector<std::byte>>());
- break;
- case ignite_type::DECIMAL: {
- big_decimal to_write;
- value.get<big_decimal>().set_scale(scale, to_write);
- builder.claim_number(to_write);
- break;
- }
- case ignite_type::NUMBER:
- builder.claim_number(value.get<big_integer>());
- break;
- case ignite_type::DATE:
- builder.claim_date(value.get<ignite_date>());
- break;
- case ignite_type::TIME:
- builder.claim_time(value.get<ignite_time>());
- break;
- case ignite_type::DATETIME:
- builder.claim_date_time(value.get<ignite_date_time>());
- break;
- case ignite_type::TIMESTAMP:
- builder.claim_timestamp(value.get<ignite_timestamp>());
- break;
- case ignite_type::BITMASK:
- builder.claim_bytes(value.get<bit_array>().get_raw());
- break;
- default:
- throw ignite_error("Type with id " + std::to_string(int(typ)) + "
is not yet supported");
- }
-}
-
-/**
- * Append column value to binary tuple.
- *
- * @param builder Binary tuple builder.
- * @param typ Column type.
- * @param value Value.
- * @param scale Column scale.
- */
-void append_column(binary_tuple_builder &builder, ignite_type typ, const
primitive &value, std::int32_t scale) {
- switch (typ) {
- case ignite_type::INT8:
- builder.append_int8(value.get<std::int8_t>());
- break;
- case ignite_type::INT16:
- builder.append_int16(value.get<std::int16_t>());
- break;
- case ignite_type::INT32:
- builder.append_int32(value.get<std::int32_t>());
- break;
- case ignite_type::INT64:
- builder.append_int64(value.get<std::int64_t>());
- break;
- case ignite_type::FLOAT:
- builder.append_float(value.get<float>());
- break;
- case ignite_type::DOUBLE:
- builder.append_double(value.get<double>());
- break;
- case ignite_type::UUID:
- builder.append_uuid(value.get<uuid>());
- break;
- case ignite_type::STRING:
- builder.append_string(value.get<std::string>());
- break;
- case ignite_type::BINARY:
- builder.append_bytes(value.get<std::vector<std::byte>>());
- break;
- case ignite_type::DECIMAL: {
- big_decimal to_write;
- value.get<big_decimal>().set_scale(scale, to_write);
- builder.append_number(to_write);
- break;
- }
- case ignite_type::NUMBER:
- builder.append_number(value.get<big_integer>());
- break;
- case ignite_type::DATE:
- builder.append_date(value.get<ignite_date>());
- break;
- case ignite_type::TIME:
- builder.append_time(value.get<ignite_time>());
- break;
- case ignite_type::DATETIME:
- builder.append_date_time(value.get<ignite_date_time>());
- break;
- case ignite_type::TIMESTAMP:
- builder.append_timestamp(value.get<ignite_timestamp>());
- break;
- case ignite_type::BITMASK:
- builder.append_bytes(value.get<bit_array>().get_raw());
- break;
- default:
- throw ignite_error("Type with id " + std::to_string(int(typ)) + "
is not yet supported");
- }
-}
-
-/**
- * Serialize tuple using table schema.
- *
- * @param sch Schema.
- * @param tuple Tuple.
- * @param key_only Should only key fields be serialized.
- * @param no_value No value bitset.
- * @return Serialized binary tuple.
- */
-std::vector<std::byte> pack_tuple(
- const schema &sch, const ignite_tuple &tuple, bool key_only,
protocol::bitset_span &no_value) {
- auto count = std::int32_t(key_only ? sch.key_column_count :
sch.columns.size());
- binary_tuple_builder builder{count};
-
- builder.start();
-
- for (std::int32_t i = 0; i < count; ++i) {
- const auto &col = sch.columns[i];
- auto col_idx = tuple.column_ordinal(col.name);
-
- if (col_idx >= 0)
- claim_column(builder, col.type, tuple.get(col_idx), col.scale);
- else
- builder.claim(std::nullopt);
- }
-
- builder.layout();
- for (std::int32_t i = 0; i < count; ++i) {
- const auto &col = sch.columns[i];
- auto col_idx = tuple.column_ordinal(col.name);
-
- if (col_idx >= 0)
- append_column(builder, col.type, tuple.get(col_idx), col.scale);
- else {
- builder.append(std::nullopt);
- no_value.set(std::size_t(i));
- }
- }
-
- return builder.build();
-}
-
-/**
- * Write tuple using table schema and writer.
- *
- * @param writer Writer.
- * @param sch Schema.
- * @param tuple Tuple.
- * @param key_only Should only key fields be written or not.
- */
-void write_tuple(protocol::writer &writer, const schema &sch, const
ignite_tuple &tuple, bool key_only) {
- const std::size_t count = key_only ? sch.key_column_count :
sch.columns.size();
- const std::size_t bytes_num = bytes_for_bits(count);
-
- auto no_value_bytes = reinterpret_cast<std::byte *>(alloca(bytes_num));
- protocol::bitset_span no_value(no_value_bytes, bytes_num);
-
- auto tuple_data = pack_tuple(sch, tuple, key_only, no_value);
-
- writer.write_bitset(no_value.data());
- writer.write_binary(tuple_data);
-}
-
-/**
- * Write tuples using table schema and writer.
- *
- * @param writer Writer.
- * @param sch Schema.
- * @param tuples Tuples.
- * @param key_only Should only key fields be written or not.
- */
-void write_tuples(protocol::writer &writer, const schema &sch, const
std::vector<ignite_tuple> &tuples, bool key_only) {
- writer.write(std::int32_t(tuples.size()));
- for (auto &tuple : tuples)
- write_tuple(writer, sch, tuple, key_only);
-}
-
/**
* Write table operation header.
*
@@ -696,4 +487,8 @@ void table_impl::remove_all_exact_async(
});
}
+std::shared_ptr<table_impl> table_impl::from_facade(table &tb) {
+ return tb.m_impl;
+}
+
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
index ed69979038..4a6395910a 100644
--- a/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/table/table_impl.h
@@ -27,6 +27,10 @@
#include <mutex>
#include <unordered_map>
+namespace ignite {
+class table;
+}
+
namespace ignite::detail {
/**
@@ -284,6 +288,21 @@ public:
void remove_all_exact_async(
transaction *tx, std::vector<ignite_tuple> records,
ignite_callback<std::vector<ignite_tuple>> callback);
+ /**
+ * Extract implementation from facade.
+ *
+ * @param tb Table.
+ * @return Implementation.
+ */
+ [[nodiscard]] static std::shared_ptr<table_impl> from_facade(table &tb);
+
+ /**
+ * Get table ID.
+ *
+ * @return ID.
+ */
+ [[nodiscard]] uuid get_id() const { return m_id; }
+
private:
/**
* Load latest schema from server asynchronously.
diff --git a/modules/platforms/cpp/ignite/client/detail/utils.cpp
b/modules/platforms/cpp/ignite/client/detail/utils.cpp
index 42a68ba7d9..6f1fab8f02 100644
--- a/modules/platforms/cpp/ignite/client/detail/utils.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/utils.cpp
@@ -16,12 +16,187 @@
*/
#include "ignite/client/detail/utils.h"
+#include "ignite/common/bits.h"
#include "ignite/common/uuid.h"
#include <string>
namespace ignite::detail {
+/**
+ * Claim space for the column.
+ *
+ * @param builder Binary tuple builder.
+ * @param typ Column type.
+ * @param value Value.
+ * @param scale Column scale.
+ */
+void claim_column(binary_tuple_builder &builder, ignite_type typ, const
primitive &value, std::int32_t scale) {
+ switch (typ) {
+ case ignite_type::INT8:
+ builder.claim_int8(value.get<std::int8_t>());
+ break;
+ case ignite_type::INT16:
+ builder.claim_int16(value.get<std::int16_t>());
+ break;
+ case ignite_type::INT32:
+ builder.claim_int32(value.get<std::int32_t>());
+ break;
+ case ignite_type::INT64:
+ builder.claim_int64(value.get<std::int64_t>());
+ break;
+ case ignite_type::FLOAT:
+ builder.claim_float(value.get<float>());
+ break;
+ case ignite_type::DOUBLE:
+ builder.claim_double(value.get<double>());
+ break;
+ case ignite_type::UUID:
+ builder.claim_uuid(value.get<uuid>());
+ break;
+ case ignite_type::STRING:
+ builder.claim_string(value.get<std::string>());
+ break;
+ case ignite_type::BINARY:
+ builder.claim_bytes(value.get<std::vector<std::byte>>());
+ break;
+ case ignite_type::DECIMAL: {
+ big_decimal to_write;
+ value.get<big_decimal>().set_scale(scale, to_write);
+ builder.claim_number(to_write);
+ break;
+ }
+ case ignite_type::NUMBER:
+ builder.claim_number(value.get<big_integer>());
+ break;
+ case ignite_type::DATE:
+ builder.claim_date(value.get<ignite_date>());
+ break;
+ case ignite_type::TIME:
+ builder.claim_time(value.get<ignite_time>());
+ break;
+ case ignite_type::DATETIME:
+ builder.claim_date_time(value.get<ignite_date_time>());
+ break;
+ case ignite_type::TIMESTAMP:
+ builder.claim_timestamp(value.get<ignite_timestamp>());
+ break;
+ case ignite_type::BITMASK:
+ builder.claim_bytes(value.get<bit_array>().get_raw());
+ break;
+ default:
+ throw ignite_error("Type with id " + std::to_string(int(typ)) + "
is not yet supported");
+ }
+}
+
+/**
+ * Append column value to binary tuple.
+ *
+ * @param builder Binary tuple builder.
+ * @param typ Column type.
+ * @param value Value.
+ * @param scale Column scale.
+ */
+void append_column(binary_tuple_builder &builder, ignite_type typ, const
primitive &value, std::int32_t scale) {
+ switch (typ) {
+ case ignite_type::INT8:
+ builder.append_int8(value.get<std::int8_t>());
+ break;
+ case ignite_type::INT16:
+ builder.append_int16(value.get<std::int16_t>());
+ break;
+ case ignite_type::INT32:
+ builder.append_int32(value.get<std::int32_t>());
+ break;
+ case ignite_type::INT64:
+ builder.append_int64(value.get<std::int64_t>());
+ break;
+ case ignite_type::FLOAT:
+ builder.append_float(value.get<float>());
+ break;
+ case ignite_type::DOUBLE:
+ builder.append_double(value.get<double>());
+ break;
+ case ignite_type::UUID:
+ builder.append_uuid(value.get<uuid>());
+ break;
+ case ignite_type::STRING:
+ builder.append_string(value.get<std::string>());
+ break;
+ case ignite_type::BINARY:
+ builder.append_bytes(value.get<std::vector<std::byte>>());
+ break;
+ case ignite_type::DECIMAL: {
+ big_decimal to_write;
+ value.get<big_decimal>().set_scale(scale, to_write);
+ builder.append_number(to_write);
+ break;
+ }
+ case ignite_type::NUMBER:
+ builder.append_number(value.get<big_integer>());
+ break;
+ case ignite_type::DATE:
+ builder.append_date(value.get<ignite_date>());
+ break;
+ case ignite_type::TIME:
+ builder.append_time(value.get<ignite_time>());
+ break;
+ case ignite_type::DATETIME:
+ builder.append_date_time(value.get<ignite_date_time>());
+ break;
+ case ignite_type::TIMESTAMP:
+ builder.append_timestamp(value.get<ignite_timestamp>());
+ break;
+ case ignite_type::BITMASK:
+ builder.append_bytes(value.get<bit_array>().get_raw());
+ break;
+ default:
+ throw ignite_error("Type with id " + std::to_string(int(typ)) + "
is not yet supported");
+ }
+}
+
+/**
+ * Serialize tuple using table schema.
+ *
+ * @param sch Schema.
+ * @param tuple Tuple.
+ * @param key_only Should only key fields be serialized.
+ * @param no_value No value bitset.
+ * @return Serialized binary tuple.
+ */
+std::vector<std::byte> pack_tuple(
+ const schema &sch, const ignite_tuple &tuple, bool key_only,
protocol::bitset_span &no_value) {
+ auto count = std::int32_t(key_only ? sch.key_column_count :
sch.columns.size());
+ binary_tuple_builder builder{count};
+
+ builder.start();
+
+ for (std::int32_t i = 0; i < count; ++i) {
+ const auto &col = sch.columns[i];
+ auto col_idx = tuple.column_ordinal(col.name);
+
+ if (col_idx >= 0)
+ claim_column(builder, col.type, tuple.get(col_idx), col.scale);
+ else
+ builder.claim(std::nullopt);
+ }
+
+ builder.layout();
+ for (std::int32_t i = 0; i < count; ++i) {
+ const auto &col = sch.columns[i];
+ auto col_idx = tuple.column_ordinal(col.name);
+
+ if (col_idx >= 0)
+ append_column(builder, col.type, tuple.get(col_idx), col.scale);
+ else {
+ builder.append(std::nullopt);
+ no_value.set(std::size_t(i));
+ }
+ }
+
+ return builder.build();
+}
+
/**
* Claim type and scale header for a value written in binary tuple.
*
@@ -47,6 +222,13 @@ void append_type_and_scale(binary_tuple_builder &builder,
ignite_type typ, std::
}
void claim_primitive_with_type(binary_tuple_builder &builder, const primitive
&value) {
+ if (value.is_null()) {
+ builder.claim(std::nullopt); // Type.
+ builder.claim(std::nullopt); // Scale.
+ builder.claim(std::nullopt); // Value.
+ return;
+ }
+
switch (value.get_type()) {
case column_type::BOOLEAN: {
claim_type_and_scale(builder, ignite_type::INT8);
@@ -99,22 +281,57 @@ void claim_primitive_with_type(binary_tuple_builder
&builder, const primitive &v
builder.claim(ignite_type::BINARY, data);
break;
}
-
- case column_type::DECIMAL:
- case column_type::DATE:
- case column_type::TIME:
- case column_type::DATETIME:
- case column_type::TIMESTAMP:
- case column_type::BITMASK:
+ case column_type::DECIMAL: {
+ const auto &dec_value = value.get<big_decimal>();
+ claim_type_and_scale(builder, ignite_type::DECIMAL,
dec_value.get_scale());
+ builder.claim_number(dec_value);
+ break;
+ }
+ case column_type::NUMBER: {
+ claim_type_and_scale(builder, ignite_type::NUMBER);
+ builder.claim_number(value.get<big_integer>());
+ break;
+ }
+ case column_type::DATE: {
+ claim_type_and_scale(builder, ignite_type::DATE);
+ builder.claim_date(value.get<ignite_date>());
+ break;
+ }
+ case column_type::TIME: {
+ claim_type_and_scale(builder, ignite_type::TIME);
+ builder.claim_time(value.get<ignite_time>());
+ break;
+ }
+ case column_type::DATETIME: {
+ claim_type_and_scale(builder, ignite_type::DATETIME);
+ builder.claim_date_time(value.get<ignite_date_time>());
+ break;
+ }
+ case column_type::TIMESTAMP: {
+ claim_type_and_scale(builder, ignite_type::TIMESTAMP);
+ builder.claim_timestamp(value.get<ignite_timestamp>());
+ break;
+ }
+ case column_type::BITMASK: {
+ claim_type_and_scale(builder, ignite_type::BITMASK);
+ builder.claim_bytes(value.get<bit_array>().get_raw());
+ break;
+ }
case column_type::PERIOD:
case column_type::DURATION:
- case column_type::NUMBER:
default:
throw ignite_error("Unsupported type: " +
std::to_string(int(value.get_type())));
}
}
void append_primitive_with_type(binary_tuple_builder &builder, const primitive
&value) {
+ if (value.is_null()) {
+ builder.append(std::nullopt); // Type.
+ builder.append(std::nullopt); // Scale.
+ builder.append(std::nullopt); // Value.
+ return;
+ }
+
switch (value.get_type()) {
case column_type::BOOLEAN: {
append_type_and_scale(builder, ignite_type::INT8);
@@ -167,16 +384,44 @@ void append_primitive_with_type(binary_tuple_builder
&builder, const primitive &
builder.append(ignite_type::BINARY, data);
break;
}
-
- case column_type::DECIMAL:
- case column_type::DATE:
- case column_type::TIME:
- case column_type::DATETIME:
- case column_type::TIMESTAMP:
- case column_type::BITMASK:
+ case column_type::DECIMAL: {
+ const auto &dec_value = value.get<big_decimal>();
+ append_type_and_scale(builder, ignite_type::DECIMAL,
dec_value.get_scale());
+ builder.append_number(dec_value);
+ break;
+ }
+ case column_type::NUMBER: {
+ append_type_and_scale(builder, ignite_type::NUMBER);
+ builder.append_number(value.get<big_integer>());
+ break;
+ }
+ case column_type::DATE: {
+ append_type_and_scale(builder, ignite_type::DATE);
+ builder.append_date(value.get<ignite_date>());
+ break;
+ }
+ case column_type::TIME: {
+ append_type_and_scale(builder, ignite_type::TIME);
+ builder.append_time(value.get<ignite_time>());
+ break;
+ }
+ case column_type::DATETIME: {
+ append_type_and_scale(builder, ignite_type::DATETIME);
+ builder.append_date_time(value.get<ignite_date_time>());
+ break;
+ }
+ case column_type::TIMESTAMP: {
+ append_type_and_scale(builder, ignite_type::TIMESTAMP);
+ builder.append_timestamp(value.get<ignite_timestamp>());
+ break;
+ }
+ case column_type::BITMASK: {
+ append_type_and_scale(builder, ignite_type::BITMASK);
+ builder.append_bytes(value.get<bit_array>().get_raw());
+ break;
+ }
case column_type::PERIOD:
case column_type::DURATION:
- case column_type::NUMBER:
default:
throw ignite_error("Unsupported type: " +
std::to_string(int(value.get_type())));
}
@@ -292,4 +537,23 @@ ignite_tuple concat(const ignite_tuple &left, const
ignite_tuple &right) {
return res;
}
+void write_tuple(protocol::writer &writer, const schema &sch, const
ignite_tuple &tuple, bool key_only) {
+ const std::size_t count = key_only ? sch.key_column_count :
sch.columns.size();
+ const std::size_t bytes_num = bytes_for_bits(count);
+
+ auto no_value_bytes = reinterpret_cast<std::byte *>(alloca(bytes_num));
+ protocol::bitset_span no_value(no_value_bytes, bytes_num);
+
+ auto tuple_data = pack_tuple(sch, tuple, key_only, no_value);
+
+ writer.write_bitset(no_value.data());
+ writer.write_binary(tuple_data);
+}
+
+void write_tuples(protocol::writer &writer, const schema &sch, const
std::vector<ignite_tuple> &tuples, bool key_only) {
+ writer.write(std::int32_t(tuples.size()));
+ for (auto &tuple : tuples)
+ write_tuple(writer, sch, tuple, key_only);
+}
+
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/detail/utils.h
b/modules/platforms/cpp/ignite/client/detail/utils.h
index fc248954a1..6715cb409f 100644
--- a/modules/platforms/cpp/ignite/client/detail/utils.h
+++ b/modules/platforms/cpp/ignite/client/detail/utils.h
@@ -17,9 +17,12 @@
#pragma once
+#include "ignite/client/detail/table/schema.h"
#include "ignite/client/primitive.h"
#include "ignite/client/table/ignite_tuple.h"
#include "ignite/client/transaction/transaction.h"
+
+#include "ignite/protocol/writer.h"
#include "ignite/schema/binary_tuple_builder.h"
#include "ignite/schema/binary_tuple_parser.h"
#include "ignite/schema/ignite_type.h"
@@ -71,4 +74,24 @@ void append_primitive_with_type(binary_tuple_builder
&builder, const primitive &
*/
[[nodiscard]] ignite_tuple concat(const ignite_tuple &left, const ignite_tuple
&right);
+/**
+ * Write tuple using table schema and writer.
+ *
+ * @param writer Writer.
+ * @param sch Schema.
+ * @param tuple Tuple.
+ * @param key_only Should only key fields be written or not.
+ */
+void write_tuple(protocol::writer &writer, const schema &sch, const
ignite_tuple &tuple, bool key_only);
+
+/**
+ * Write tuples using table schema and writer.
+ *
+ * @param writer Writer.
+ * @param sch Schema.
+ * @param tuples Tuples.
+ * @param key_only Should only key fields be written or not.
+ */
+void write_tuples(protocol::writer &writer, const schema &sch, const
std::vector<ignite_tuple> &tuples, bool key_only);
+
} // namespace ignite::detail
diff --git a/modules/platforms/cpp/ignite/client/ignite_client.cpp
b/modules/platforms/cpp/ignite/client/ignite_client.cpp
index 8c33215b60..16743e5bd9 100644
--- a/modules/platforms/cpp/ignite/client/ignite_client.cpp
+++ b/modules/platforms/cpp/ignite/client/ignite_client.cpp
@@ -74,10 +74,23 @@ sql ignite_client::get_sql() const noexcept {
return sql(impl().get_sql_impl());
}
+compute ignite_client::get_compute() const noexcept {
+ return compute(impl().get_compute_impl());
+}
+
transactions ignite_client::get_transactions() const noexcept {
return transactions(impl().get_transactions_impl());
}
+void
ignite_client::get_cluster_nodes_async(ignite_callback<std::vector<cluster_node>>
callback) {
+ return impl().get_cluster_nodes_async(std::move(callback));
+}
+
+std::vector<cluster_node> ignite_client::get_cluster_nodes() {
+ return sync<std::vector<cluster_node>>(
+ [this](auto callback) mutable {
get_cluster_nodes_async(std::move(callback)); });
+}
+
detail::ignite_client_impl &ignite_client::impl() noexcept {
return *((detail::ignite_client_impl *) (m_impl.get()));
}
diff --git a/modules/platforms/cpp/ignite/client/ignite_client.h
b/modules/platforms/cpp/ignite/client/ignite_client.h
index acc02807bb..b20e10e8dd 100644
--- a/modules/platforms/cpp/ignite/client/ignite_client.h
+++ b/modules/platforms/cpp/ignite/client/ignite_client.h
@@ -17,7 +17,9 @@
#pragma once
+#include "ignite/client/compute/compute.h"
#include "ignite/client/ignite_client_configuration.h"
+#include "ignite/client/network/cluster_node.h"
#include "ignite/client/sql/sql.h"
#include "ignite/client/table/tables.h"
#include "ignite/client/transaction/transactions.h"
@@ -108,6 +110,13 @@ public:
*/
[[nodiscard]] IGNITE_API sql get_sql() const noexcept;
+ /**
+ * Gets the Compute API.
+ *
+ * @return Compute API.
+ */
+ [[nodiscard]] IGNITE_API compute get_compute() const noexcept;
+
/**
* Gets the Transactions API.
*
@@ -115,6 +124,22 @@ public:
*/
[[nodiscard]] IGNITE_API transactions get_transactions() const noexcept;
+ /**
+ * Gets the cluster nodes asynchronously.
+ * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+ *
+ * @param callback Callback called with the list of cluster nodes upon
success.
+ */
+ IGNITE_API void
get_cluster_nodes_async(ignite_callback<std::vector<cluster_node>> callback);
+
+ /**
+ * Gets the cluster nodes.
+ * NOTE: Temporary API to enable Compute until we have proper Cluster API.
+ *
+ * @return The list of cluster nodes upon success.
+ */
+ [[nodiscard]] IGNITE_API std::vector<cluster_node> get_cluster_nodes();
+
private:
/**
* Constructor
diff --git a/modules/platforms/cpp/ignite/network/end_point.h
b/modules/platforms/cpp/ignite/client/network/cluster_node.h
similarity index 50%
copy from modules/platforms/cpp/ignite/network/end_point.h
copy to modules/platforms/cpp/ignite/client/network/cluster_node.h
index 22b0ce67b0..047ed443e7 100644
--- a/modules/platforms/cpp/ignite/network/end_point.h
+++ b/modules/platforms/cpp/ignite/client/network/cluster_node.h
@@ -17,34 +17,55 @@
#pragma once
+#include "ignite/common/end_point.h"
+
#include <cstdint>
-#include <string>
+#include <type_traits>
+#include <variant>
+#include <vector>
-namespace ignite::network {
+namespace ignite {
/**
- * Connection end point structure.
+ * Ignite cluster node.
*/
-struct end_point {
+class cluster_node {
+public:
// Default
- end_point() = default;
+ cluster_node() = default;
/**
* Constructor.
*
- * @param host Host.
- * @param port Port.
+ * @param id Local ID.
+ * @param name Name.
+ * @param address Address.
+ */
+ cluster_node(std::string id, std::string name, network::end_point address)
+ : m_id(std::move(id))
+ , m_name(std::move(name))
+ , m_address(std::move(address)) {}
+
+ /**
+ * Gets the local node id. Changes after node restart.
+ *
+ * @return Local node id.
*/
- end_point(std::string host, uint16_t port)
- : host(std::move(host))
- , port(port) {}
+ [[nodiscard]] const std::string &get_id() const { return m_id; }
/**
- * Convert to string.
+ * Gets the unique name of the cluster member. Does not change after node
restart.
*
- * @return String form.
+ * @return Unique name of the cluster member.
*/
- [[nodiscard]] std::string to_string() const { return host + ":" +
std::to_string(port); }
+ [[nodiscard]] const std::string &get_name() const { return m_name; }
+
+ /**
+ * Gets the node address.
+ *
+ * @return Node address.
+ */
+ [[nodiscard]] const network::end_point &get_address() const { return
m_address; }
/**
* Compare to another instance.
@@ -53,21 +74,27 @@ struct end_point {
* @return Negative value if less, positive if larger and zero, if equals
* another instance.
*/
- [[nodiscard]] int compare(const end_point &other) const {
- if (port < other.port)
- return -1;
+ [[nodiscard]] int compare(const cluster_node &other) const {
+ auto name_comp = m_name.compare(other.m_name);
+ if (name_comp)
+ return name_comp;
- if (port > other.port)
- return 1;
+ auto id_comp = m_id.compare(other.m_id);
+ if (id_comp)
+ return id_comp;
- return host.compare(other.host);
+ return m_address.compare(other.m_address);
}
- /** Remote host. */
- std::string host;
+private:
+ /** Local ID. */
+ std::string m_id{};
+
+ /** Name. */
+ std::string m_name{};
- /** TCP port. */
- uint16_t port = 0;
+ /** Address. */
+ network::end_point m_address{};
};
/**
@@ -77,8 +104,8 @@ struct end_point {
* @param val2 Second value.
* @return True if equal.
*/
-inline bool operator==(const end_point &val1, const end_point &val2) {
- return val1.port == val2.port && val1.host == val2.host;
+inline bool operator==(const cluster_node &val1, const cluster_node &val2) {
+ return val1.compare(val2) == 0;
}
/**
@@ -88,7 +115,7 @@ inline bool operator==(const end_point &val1, const
end_point &val2) {
* @param val2 Second value.
* @return True if not equal.
*/
-inline bool operator!=(const end_point &val1, const end_point &val2) {
+inline bool operator!=(const cluster_node &val1, const cluster_node &val2) {
return !(val1 == val2);
}
@@ -99,7 +126,7 @@ inline bool operator!=(const end_point &val1, const
end_point &val2) {
* @param val2 Second value.
* @return True if less.
*/
-inline bool operator<(const end_point &val1, const end_point &val2) {
+inline bool operator<(const cluster_node &val1, const cluster_node &val2) {
return val1.compare(val2) < 0;
}
@@ -110,7 +137,7 @@ inline bool operator<(const end_point &val1, const
end_point &val2) {
* @param val2 Second value.
* @return True if less or equal.
*/
-inline bool operator<=(const end_point &val1, const end_point &val2) {
+inline bool operator<=(const cluster_node &val1, const cluster_node &val2) {
return val1.compare(val2) <= 0;
}
@@ -121,7 +148,7 @@ inline bool operator<=(const end_point &val1, const
end_point &val2) {
* @param val2 Second value.
* @return True if greater.
*/
-inline bool operator>(const end_point &val1, const end_point &val2) {
+inline bool operator>(const cluster_node &val1, const cluster_node &val2) {
return val1.compare(val2) > 0;
}
@@ -132,8 +159,8 @@ inline bool operator>(const end_point &val1, const
end_point &val2) {
* @param val2 Second value.
* @return True if greater or equal.
*/
-inline bool operator>=(const end_point &val1, const end_point &val2) {
+inline bool operator>=(const cluster_node &val1, const cluster_node &val2) {
return val1.compare(val2) >= 0;
}
-} // namespace ignite::network
+} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/client/primitive.h
b/modules/platforms/cpp/ignite/client/primitive.h
index 667fbb233f..f9ecd689f4 100644
--- a/modules/platforms/cpp/ignite/client/primitive.h
+++ b/modules/platforms/cpp/ignite/client/primitive.h
@@ -29,6 +29,7 @@
#include "ignite/common/uuid.h"
#include <cstdint>
+#include <optional>
#include <type_traits>
#include <variant>
#include <vector>
@@ -43,6 +44,16 @@ public:
// Default
primitive() = default;
+ /**
+ * Null constructor.
+ */
+ primitive(std::nullptr_t) {} // NOLINT(google-explicit-constructor)
+
+ /**
+ * Null option constructor.
+ */
+ primitive(std::nullopt_t) {} // NOLINT(google-explicit-constructor)
+
/**
* Constructor for boolean value.
*
@@ -237,12 +248,23 @@ public:
}
}
+ /**
+ * Check whether element is null.
+ *
+ * @return Value indicating whether element is null.
+ */
+ [[nodiscard]] bool is_null() const noexcept { return m_value.index() == 0;
}
+
/**
* Get primitive type.
*
* @return Primitive type.
*/
- [[nodiscard]] column_type get_type() const { return
static_cast<column_type>(m_value.index()); }
+ [[nodiscard]] column_type get_type() const noexcept {
+ if (is_null())
+ return column_type::UNDEFINED;
+ return static_cast<column_type>(m_value.index() - 1);
+ }
/**
* @brief Comparison operator.
@@ -271,7 +293,8 @@ private:
typedef void *unsupported_type;
/** Value type. */
- typedef std::variant<bool, // Bool = 0
+ typedef std::variant<std::nullptr_t,
+ bool, // Bool = 0
std::int8_t, // Int8 = 1
std::int16_t, // Int16 = 2
std::int32_t, // Int32 = 3
diff --git a/modules/platforms/cpp/ignite/client/primitive_test.cpp
b/modules/platforms/cpp/ignite/client/primitive_test.cpp
index 46a549f832..b71b3033e3 100644
--- a/modules/platforms/cpp/ignite/client/primitive_test.cpp
+++ b/modules/platforms/cpp/ignite/client/primitive_test.cpp
@@ -23,11 +23,12 @@ using namespace ignite;
template<typename T>
void check_primitive_type(column_type expected) {
- primitive val_bool(T{});
- EXPECT_EQ(val_bool.get_type(), expected);
+ primitive val(T{});
+ EXPECT_EQ(val.get_type(), expected);
}
TEST(primitive, get_column_type) {
+ check_primitive_type<nullptr_t>(column_type::UNDEFINED);
check_primitive_type<bool>(column_type::BOOLEAN);
check_primitive_type<int8_t>(column_type::INT8);
check_primitive_type<int16_t>(column_type::INT16);
@@ -46,3 +47,15 @@ TEST(primitive, get_column_type) {
check_primitive_type<std::vector<std::byte>>(column_type::BYTE_ARRAY);
check_primitive_type<big_integer>(column_type::NUMBER);
}
+
+TEST(primitive, null_value_by_nullptr) {
+ primitive val(nullptr);
+ EXPECT_EQ(val.get_type(), column_type::UNDEFINED);
+ EXPECT_TRUE(val.is_null());
+}
+
+TEST(primitive, null_value_by_nullopt) {
+ primitive val(std::nullopt);
+ EXPECT_EQ(val.get_type(), column_type::UNDEFINED);
+ EXPECT_TRUE(val.is_null());
+}
diff --git a/modules/platforms/cpp/ignite/client/table/key_value_view.cpp
b/modules/platforms/cpp/ignite/client/table/key_value_view.cpp
index ccd229041b..bb54ac2992 100644
--- a/modules/platforms/cpp/ignite/client/table/key_value_view.cpp
+++ b/modules/platforms/cpp/ignite/client/table/key_value_view.cpp
@@ -16,30 +16,11 @@
*/
#include "ignite/client/table/key_value_view.h"
+#include "ignite/client/detail/argument_check_utils.h"
#include "ignite/client/detail/table/table_impl.h"
namespace ignite {
-/**
- * Check key argument.
- *
- * @param key Key tuple.
- */
-void inline check_key_argument(const ignite_tuple &key) {
- if (0 == key.column_count())
- throw ignite_error("Key tuple can not be empty");
-}
-
-/**
- * Check value argument.
- *
- * @param value Value tuple.
- */
-void inline check_value_argument(const ignite_tuple &value) {
- if (0 == value.column_count())
- throw ignite_error("Value tuple can not be empty");
-}
-
/**
* Process multiple kv pairs by uniting key and value part of the tuple
* to a single record.
@@ -58,15 +39,15 @@ std::vector<ignite_tuple> concat_records(const
std::vector<std::pair<ignite_tupl
void key_value_view<ignite_tuple, ignite_tuple>::get_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<value_type>> callback) {
- check_key_argument(key);
+ detail::arg_check::key_tuple_non_empty(key);
m_impl->get_async(tx, key, std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::put_async(
transaction *tx, const key_type &key, const value_type &value,
ignite_callback<void> callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->upsert_async(tx, detail::concat(key, value), std::move(callback));
}
@@ -83,7 +64,7 @@ void key_value_view<ignite_tuple,
ignite_tuple>::get_all_async(
void key_value_view<ignite_tuple, ignite_tuple>::contains_async(
transaction *tx, const ignite_tuple &key, ignite_callback<bool> callback) {
- check_key_argument(key);
+ detail::arg_check::key_tuple_non_empty(key);
m_impl->contains_async(tx, key, std::move(callback));
}
@@ -100,31 +81,31 @@ void key_value_view<ignite_tuple,
ignite_tuple>::put_all_async(
void key_value_view<ignite_tuple, ignite_tuple>::get_and_put_async(transaction
*tx, const key_type &key,
const value_type &value, ignite_callback<std::optional<value_type>>
callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->get_and_upsert_async(tx, detail::concat(key, value),
std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::put_if_absent_async(
transaction *tx, const key_type &key, const value_type &value,
ignite_callback<bool> callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->insert_async(tx, detail::concat(key, value), std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::remove_async(
transaction *tx, const ignite_tuple &key, ignite_callback<bool> callback) {
- check_key_argument(key);
+ detail::arg_check::key_tuple_non_empty(key);
m_impl->remove_async(tx, key, std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::remove_async(
transaction *tx, const key_type &key, const value_type &value,
ignite_callback<bool> callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->remove_exact_async(tx, detail::concat(key, value),
std::move(callback));
}
@@ -151,32 +132,32 @@ void key_value_view<ignite_tuple,
ignite_tuple>::remove_all_async(transaction *t
void key_value_view<ignite_tuple, ignite_tuple>::get_and_remove_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<value_type>> callback) {
- check_key_argument(key);
+ detail::arg_check::key_tuple_non_empty(key);
m_impl->get_and_remove_async(tx, key, std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::replace_async(
transaction *tx, const key_type &key, const value_type &value,
ignite_callback<bool> callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->replace_async(tx, detail::concat(key, value), std::move(callback));
}
void key_value_view<ignite_tuple, ignite_tuple>::replace_async(transaction
*tx, const key_type &key,
const value_type &old_value, const value_type &new_value,
ignite_callback<bool> callback) {
- check_key_argument(key);
- check_value_argument(old_value);
- check_value_argument(new_value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(old_value);
+ detail::arg_check::value_tuple_non_empty(new_value);
m_impl->replace_async(tx, detail::concat(key, old_value),
detail::concat(key, new_value), std::move(callback));
}
void key_value_view<ignite_tuple,
ignite_tuple>::get_and_replace_async(transaction *tx, const key_type &key,
const value_type &value, ignite_callback<std::optional<value_type>>
callback) {
- check_key_argument(key);
- check_value_argument(value);
+ detail::arg_check::key_tuple_non_empty(key);
+ detail::arg_check::value_tuple_non_empty(value);
m_impl->get_and_replace_async(tx, detail::concat(key, value),
std::move(callback));
}
diff --git a/modules/platforms/cpp/ignite/client/table/record_view.cpp
b/modules/platforms/cpp/ignite/client/table/record_view.cpp
index 392b115525..7b6236b161 100644
--- a/modules/platforms/cpp/ignite/client/table/record_view.cpp
+++ b/modules/platforms/cpp/ignite/client/table/record_view.cpp
@@ -16,22 +16,21 @@
*/
#include "ignite/client/table/record_view.h"
+#include "ignite/client/detail/argument_check_utils.h"
#include "ignite/client/detail/table/table_impl.h"
namespace ignite {
void record_view<ignite_tuple>::get_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<value_type>> callback) {
- if (0 == key.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(key, "Tuple");
m_impl->get_async(tx, key, std::move(callback));
}
void record_view<ignite_tuple>::upsert_async(
transaction *tx, const ignite_tuple &record, ignite_callback<void>
callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->upsert_async(tx, record, std::move(callback));
}
@@ -58,16 +57,14 @@ void record_view<ignite_tuple>::upsert_all_async(
void record_view<ignite_tuple>::get_and_upsert_async(
transaction *tx, const ignite_tuple &record,
ignite_callback<std::optional<value_type>> callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->get_and_upsert_async(tx, record, std::move(callback));
}
void record_view<ignite_tuple>::insert_async(
transaction *tx, const ignite_tuple &record, ignite_callback<bool>
callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->insert_async(tx, record, std::move(callback));
}
@@ -84,47 +81,42 @@ void record_view<ignite_tuple>::insert_all_async(
void record_view<ignite_tuple>::replace_async(
transaction *tx, const ignite_tuple &record, ignite_callback<bool>
callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->replace_async(tx, record, std::move(callback));
}
void record_view<ignite_tuple>::replace_async(
transaction *tx, const ignite_tuple &record, const ignite_tuple
&new_record, ignite_callback<bool> callback) {
- if (0 == record.column_count() || 0 == new_record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
+ detail::arg_check::tuple_non_empty(new_record, "Tuple");
m_impl->replace_async(tx, record, new_record, std::move(callback));
}
void record_view<ignite_tuple>::get_and_replace_async(
transaction *tx, const ignite_tuple &record,
ignite_callback<std::optional<value_type>> callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->get_and_replace_async(tx, record, std::move(callback));
}
void record_view<ignite_tuple>::remove_async(transaction *tx, const
ignite_tuple &key, ignite_callback<bool> callback) {
- if (0 == key.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(key, "Tuple");
m_impl->remove_async(tx, key, std::move(callback));
}
void record_view<ignite_tuple>::remove_exact_async(
transaction *tx, const ignite_tuple &record, ignite_callback<bool>
callback) {
- if (0 == record.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(record, "Tuple");
m_impl->remove_exact_async(tx, record, std::move(callback));
}
void record_view<ignite_tuple>::get_and_remove_async(
transaction *tx, const ignite_tuple &key,
ignite_callback<std::optional<value_type>> callback) {
- if (0 == key.column_count())
- throw ignite_error("Tuple can not be empty");
+ detail::arg_check::tuple_non_empty(key, "Tuple");
m_impl->get_and_remove_async(tx, key, std::move(callback));
}
diff --git a/modules/platforms/cpp/ignite/client/table/table.h
b/modules/platforms/cpp/ignite/client/table/table.h
index 4d71cb94f6..d946d93fb0 100644
--- a/modules/platforms/cpp/ignite/client/table/table.h
+++ b/modules/platforms/cpp/ignite/client/table/table.h
@@ -38,6 +38,7 @@ class tables_impl;
* Table view.
*/
class table {
+ friend class detail::table_impl;
friend class detail::tables_impl;
public:
diff --git a/modules/platforms/cpp/ignite/common/CMakeLists.txt
b/modules/platforms/cpp/ignite/common/CMakeLists.txt
index 0f2af03db0..1c5a786c28 100644
--- a/modules/platforms/cpp/ignite/common/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/common/CMakeLists.txt
@@ -27,6 +27,7 @@ set(PUBLIC_HEADERS
bytes.h
bytes_view.h
config.h
+ end_point.h
ignite_date.h
ignite_date_time.h
ignite_error.h
diff --git a/modules/platforms/cpp/ignite/network/end_point.h
b/modules/platforms/cpp/ignite/common/end_point.h
similarity index 99%
rename from modules/platforms/cpp/ignite/network/end_point.h
rename to modules/platforms/cpp/ignite/common/end_point.h
index 22b0ce67b0..3e895987ab 100644
--- a/modules/platforms/cpp/ignite/network/end_point.h
+++ b/modules/platforms/cpp/ignite/common/end_point.h
@@ -67,7 +67,7 @@ struct end_point {
std::string host;
/** TCP port. */
- uint16_t port = 0;
+ uint16_t port{0};
};
/**
diff --git a/modules/platforms/cpp/ignite/network/async_handler.h
b/modules/platforms/cpp/ignite/network/async_handler.h
index 06d5b52fac..9a64c22035 100644
--- a/modules/platforms/cpp/ignite/network/async_handler.h
+++ b/modules/platforms/cpp/ignite/network/async_handler.h
@@ -17,9 +17,9 @@
#pragma once
+#include "ignite/common/end_point.h"
#include <ignite/common/ignite_error.h>
#include <ignite/network/data_buffer.h>
-#include <ignite/network/end_point.h>
#include <cstdint>
#include <optional>
diff --git
a/modules/platforms/cpp/ignite/network/detail/linux/connecting_context.h
b/modules/platforms/cpp/ignite/network/detail/linux/connecting_context.h
index b8ff6f640d..6cd9e9098d 100644
--- a/modules/platforms/cpp/ignite/network/detail/linux/connecting_context.h
+++ b/modules/platforms/cpp/ignite/network/detail/linux/connecting_context.h
@@ -17,10 +17,9 @@
#pragma once
-#include "linux_async_client.h"
-
-#include <ignite/network/end_point.h>
-#include <ignite/network/tcp_range.h>
+#include "ignite/common/end_point.h"
+#include "ignite/network/detail/linux/linux_async_client.h"
+#include "ignite/network/tcp_range.h"
#include <cstdint>
#include <memory>
diff --git
a/modules/platforms/cpp/ignite/network/detail/linux/linux_async_client.h
b/modules/platforms/cpp/ignite/network/detail/linux/linux_async_client.h
index 6f100766b1..4b771f38ce 100644
--- a/modules/platforms/cpp/ignite/network/detail/linux/linux_async_client.h
+++ b/modules/platforms/cpp/ignite/network/detail/linux/linux_async_client.h
@@ -19,10 +19,10 @@
#include "sockets.h"
-#include <ignite/network/async_handler.h>
-#include <ignite/network/codec.h>
-#include <ignite/network/end_point.h>
-#include <ignite/network/tcp_range.h>
+#include "ignite/common/end_point.h"
+#include "ignite/network/async_handler.h"
+#include "ignite/network/codec.h"
+#include "ignite/network/tcp_range.h"
#include <cstdint>
#include <deque>
diff --git
a/modules/platforms/cpp/ignite/network/detail/linux/linux_async_worker_thread.h
b/modules/platforms/cpp/ignite/network/detail/linux/linux_async_worker_thread.h
index df441581d0..845c26ab3e 100644
---
a/modules/platforms/cpp/ignite/network/detail/linux/linux_async_worker_thread.h
+++
b/modules/platforms/cpp/ignite/network/detail/linux/linux_async_worker_thread.h
@@ -17,12 +17,11 @@
#pragma once
-#include "connecting_context.h"
-#include "linux_async_client.h"
-
-#include <ignite/network/async_handler.h>
-#include <ignite/network/end_point.h>
-#include <ignite/network/tcp_range.h>
+#include "ignite/common/end_point.h"
+#include "ignite/network/async_handler.h"
+#include "ignite/network/detail/linux/connecting_context.h"
+#include "ignite/network/detail/linux/linux_async_client.h"
+#include "ignite/network/tcp_range.h"
#include <cstdint>
#include <ctime>
diff --git a/modules/platforms/cpp/ignite/network/detail/win/win_async_client.h
b/modules/platforms/cpp/ignite/network/detail/win/win_async_client.h
index 10e4de0599..b27235216f 100644
--- a/modules/platforms/cpp/ignite/network/detail/win/win_async_client.h
+++ b/modules/platforms/cpp/ignite/network/detail/win/win_async_client.h
@@ -19,10 +19,10 @@
#include "sockets.h"
-#include <ignite/network/async_handler.h>
-#include <ignite/network/codec.h>
-#include <ignite/network/end_point.h>
-#include <ignite/network/tcp_range.h>
+#include "ignite/common/end_point.h"
+#include "ignite/network/async_handler.h"
+#include "ignite/network/codec.h"
+#include "ignite/network/tcp_range.h"
#include <cstdint>
#include <deque>
diff --git a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
index 90b43f2ad3..2d9611905f 100644
--- a/modules/platforms/cpp/tests/client-test/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/client-test/CMakeLists.txt
@@ -20,6 +20,7 @@ project(ignite-client-test)
set(TARGET ${PROJECT_NAME})
set(SOURCES
+ compute_test.cpp
gtest_logger.h
ignite_client_test.cpp
ignite_runner_suite.h
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
new file mode 100644
index 0000000000..91d14489e0
--- /dev/null
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite_runner_suite.h"
+
+#include "ignite/client/ignite_client.h"
+#include "ignite/client/ignite_client_configuration.h"
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <limits>
+
+using namespace ignite;
+
+/**
+ * Test suite.
+ */
+class compute_test : public ignite_runner_suite {
+protected:
+ void SetUp() override {
+ ignite_client_configuration cfg{get_node_addrs()};
+ cfg.set_logger(get_logger());
+
+ m_client = ignite_client::start(cfg, std::chrono::seconds(30));
+ }
+
+ void TearDown() override {
+ // remove all
+ }
+
+ /**
+ * Get specific node.
+ * @param id Node id.
+ * @return Node.
+ */
+ cluster_node get_node(size_t id) {
+ auto nodes = m_client.get_cluster_nodes();
+ std::sort(
+ nodes.begin(), nodes.end(), [](const auto &n1, const auto &n2) {
return n1.get_name() < n2.get_name(); });
+ return nodes[id];
+ }
+
+ /**
+ * Get nodes as set.
+ * @return Nodes in set.
+ */
+ std::set<cluster_node> get_node_set() {
+ auto nodes = m_client.get_cluster_nodes();
+ return {nodes.begin(), nodes.end()};
+ }
+
+ /**
+ * Check that passed argument returned in a specific string form.
+ *
+ * @tparam T Type of the argument.
+ * @param value Argument.
+ * @param expected_str Expected string form.
+ */
+ template<typename T>
+ void check_argument(T value, const std::string &expected_str) {
+ auto cluster_nodes = m_client.get_cluster_nodes();
+ auto result = m_client.get_compute().execute(cluster_nodes, ECHO_JOB,
{value, expected_str});
+
+ ASSERT_TRUE(result.has_value());
+ EXPECT_EQ(result.value().template get<T>(), value);
+ }
+
+ /**
+ * Check that passed argument returned in an expected string form.
+ *
+ * @tparam T Type of the argument.
+ * @param value Argument.
+ */
+ template<typename T>
+ void check_argument(T value) {
+ check_argument(std::move(value), std::to_string(value));
+ }
+
+ /** Ignite client. */
+ ignite_client m_client;
+};
+
+TEST_F(compute_test, get_cluster_nodes) {
+ auto cluster_nodes = m_client.get_cluster_nodes();
+
+ std::sort(cluster_nodes.begin(), cluster_nodes.end(),
+ [](const auto &n1, const auto &n2) { return n1.get_name() <
n2.get_name(); });
+
+ ASSERT_EQ(4, cluster_nodes.size());
+
+ EXPECT_FALSE(cluster_nodes[0].get_id().empty());
+ EXPECT_FALSE(cluster_nodes[1].get_id().empty());
+
+ EXPECT_EQ(3344, cluster_nodes[0].get_address().port);
+ EXPECT_EQ(3345, cluster_nodes[1].get_address().port);
+
+ EXPECT_FALSE(cluster_nodes[0].get_address().host.empty());
+ EXPECT_FALSE(cluster_nodes[1].get_address().host.empty());
+
+ EXPECT_EQ(cluster_nodes[0].get_address().host,
cluster_nodes[1].get_address().host);
+}
+
+TEST_F(compute_test, execute_on_random_node) {
+ auto cluster_nodes = m_client.get_cluster_nodes();
+
+ auto result = m_client.get_compute().execute(cluster_nodes, NODE_NAME_JOB,
{});
+
+ ASSERT_TRUE(result.has_value());
+ EXPECT_THAT(result.value().get<std::string>(),
::testing::StartsWith(PLATFORM_TEST_NODE_RUNNER));
+}
+
+TEST_F(compute_test, execute_on_specific_node) {
+ auto res1 = m_client.get_compute().execute({get_node(0)}, NODE_NAME_JOB,
{"-", 11});
+ auto res2 = m_client.get_compute().execute({get_node(1)}, NODE_NAME_JOB,
{":", 22});
+
+ ASSERT_TRUE(res1.has_value());
+ ASSERT_TRUE(res2.has_value());
+
+ EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER +
"-_11");
+ EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER +
"_2:_22");
+}
+
+TEST_F(compute_test, execute_broadcast_one_node) {
+ auto res = m_client.get_compute().broadcast({get_node(1)}, NODE_NAME_JOB,
{"42"});
+
+ ASSERT_EQ(res.size(), 1);
+
+ EXPECT_EQ(res.begin()->first, get_node(1));
+
+ ASSERT_TRUE(res.begin()->second.has_value());
+ EXPECT_EQ(res.begin()->second.value(), PLATFORM_TEST_NODE_RUNNER + "_242");
+}
+
+TEST_F(compute_test, execute_broadcast_all_nodes) {
+ auto res = m_client.get_compute().broadcast(get_node_set(), NODE_NAME_JOB,
{"42"});
+
+ ASSERT_EQ(res.size(), 4);
+
+ EXPECT_EQ(res[get_node(0)].value(), get_node(0).get_name() + "42");
+ EXPECT_EQ(res[get_node(1)].value(), get_node(1).get_name() + "42");
+ EXPECT_EQ(res[get_node(2)].value(), get_node(2).get_name() + "42");
+ EXPECT_EQ(res[get_node(3)].value(), get_node(3).get_name() + "42");
+}
+
+TEST_F(compute_test, execute_with_args) {
+ auto cluster_nodes = m_client.get_cluster_nodes();
+
+ auto result = m_client.get_compute().execute(cluster_nodes, CONCAT_JOB,
{5.3, uuid(), "42", nullptr});
+
+ ASSERT_TRUE(result.has_value());
+ EXPECT_EQ(result.value().get<std::string>(),
"5.3_00000000-0000-0000-0000-000000000000_42_null");
+}
+
+TEST_F(compute_test, job_error_propagates_to_client) {
+ auto cluster_nodes = m_client.get_cluster_nodes();
+
+ EXPECT_THROW(
+ {
+ try {
+ m_client.get_compute().execute(cluster_nodes, ERROR_JOB,
{"unused"});
+ } catch (const ignite_error &e) {
+ EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job
error"));
+ EXPECT_THAT(e.what_str(),
+ testing::HasSubstr(
+
"org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$CustomException"));
+ EXPECT_THAT(e.what_str(), testing::HasSubstr("IGN-TBL-3"));
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+TEST_F(compute_test, unknown_node_throws) {
+ auto unknown_node = cluster_node("some", "random", {"127.0.0.1", 1234});
+
+ EXPECT_THROW(
+ {
+ try {
+ m_client.get_compute().execute({unknown_node}, ECHO_JOB,
{"unused"});
+ } catch (const ignite_error &e) {
+ EXPECT_THAT(e.what_str(), testing::HasSubstr("Specified node
is not present in the cluster: random"));
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+TEST_F(compute_test, all_arg_types) {
+ check_argument<std::int8_t>(42);
+ check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::max());
+ check_argument<std::int8_t>(std::numeric_limits<std::int8_t>::min());
+
+ check_argument<std::int16_t>(4242);
+ check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::max());
+ check_argument<std::int16_t>(std::numeric_limits<std::int16_t>::min());
+
+ check_argument<std::int32_t>(424242);
+ check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::max());
+ check_argument<std::int32_t>(std::numeric_limits<std::int32_t>::min());
+
+ check_argument<std::int64_t>(424242424242);
+ check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::max());
+ check_argument<std::int64_t>(std::numeric_limits<std::int64_t>::min());
+
+ check_argument<float>(0.123456f);
+ check_argument<float>(std::numeric_limits<float>::max(), "3.4028235E38");
+ check_argument<float>(std::numeric_limits<float>::min(), "1.17549435E-38");
+
+ check_argument<double>(0.987654);
+ check_argument<double>(std::numeric_limits<double>::max(),
"1.7976931348623157E308");
+ check_argument<double>(std::numeric_limits<double>::min(),
"2.2250738585072014E-308");
+
+ check_argument<big_decimal>({123456, 3}, "123.456");
+ check_argument<big_decimal>({}, "0");
+ check_argument<big_decimal>({1, 0}, "1");
+
+ auto str_dec = "12345678909876543211234567890.987654321";
+ check_argument<big_decimal>(big_decimal(str_dec), str_dec);
+
+ check_argument<ignite_date>({2021, 11, 18}, "2021-11-18");
+ check_argument<ignite_time>({13, 8, 55, 266574889}, "13:08:55.266574889");
+ check_argument<ignite_date_time>({{2021, 11, 18}, {13, 8, 55, 266574889}},
"2021-11-18T13:08:55.266574889");
+
+ check_argument<uuid>({0, 0}, "00000000-0000-0000-0000-000000000000");
+ check_argument<uuid>({0x123e4567e89b12d3, 0x7456426614174000},
"123e4567-e89b-12d3-7456-426614174000");
+}
+
+TEST_F(compute_test, execute_colocated) {
+ std::map<std::int32_t, std::string> nodes_for_values = {{1, "_4"}, {5,
"_2"}, {9, "_3"}, {10, ""}, {11, "_2"}};
+
+ for (const auto &var : nodes_for_values) {
+ SCOPED_TRACE("key=" + std::to_string(var.first) + ", node=" +
var.second);
+ auto key = get_tuple(var.first);
+
+ auto resNodeName = m_client.get_compute().execute_colocated(TABLE_1,
key, NODE_NAME_JOB, {});
+ auto expectedNodeName = PLATFORM_TEST_NODE_RUNNER + var.second;
+
+ EXPECT_EQ(expectedNodeName, resNodeName);
+ }
+}
+
+TEST_F(compute_test, execute_colocated_throws_when_table_does_not_exist) {
+ EXPECT_THROW(
+ {
+ try {
+ (void)
m_client.get_compute().execute_colocated("unknownTable", get_tuple(42),
ECHO_JOB, {});
+ } catch (const ignite_error &e) {
+ EXPECT_STREQ("Table does not exist: 'unknownTable'", e.what());
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+TEST_F(compute_test, execute_colocated_throws_when_key_column_is_missing) {
+ EXPECT_THROW(
+ {
+ try {
+ (void) m_client.get_compute().execute_colocated(TABLE_1,
get_tuple("some"), ECHO_JOB, {});
+ } catch (const ignite_error &e) {
+ EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Missed key
column: KEY"));
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+TEST_F(compute_test, execute_colocated_throws_when_key_is_empty) {
+ EXPECT_THROW(
+ {
+ try {
+ (void) m_client.get_compute().execute_colocated(TABLE_1, {},
ECHO_JOB, {});
+ } catch (const ignite_error &e) {
+ EXPECT_EQ("Key tuple can not be empty", e.what_str());
+ throw;
+ }
+ },
+ ignite_error);
+}
+
+TEST_F(compute_test, exception_in_server_job_propogates_to_client) {
+ EXPECT_THROW(
+ {
+ try {
+ (void) m_client.get_compute().execute_colocated(TABLE_1, {},
ECHO_JOB, {});
+ } catch (const ignite_error &e) {
+ EXPECT_EQ("Key tuple can not be empty", e.what_str());
+ throw;
+ }
+ },
+ ignite_error);
+}
diff --git a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
index eba5d5e926..288a0f2bae 100644
--- a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
+++ b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
@@ -43,6 +43,17 @@ public:
static constexpr std::string_view TABLE_1 = "tbl1"sv;
static constexpr std::string_view TABLE_NAME_ALL_COLUMNS =
"tbl_all_columns"sv;
+ inline static const std::string PLATFORM_TEST_NODE_RUNNER =
+ "org.apache.ignite.internal.runner.app.PlatformTestNodeRunner";
+
+ inline static const std::string IT_THIN_CLIENT_COMPUTE_TEST =
+ "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest";
+
+ inline static const std::string NODE_NAME_JOB =
IT_THIN_CLIENT_COMPUTE_TEST + "$NodeNameJob";
+ inline static const std::string CONCAT_JOB = IT_THIN_CLIENT_COMPUTE_TEST +
"$ConcatJob";
+ inline static const std::string ERROR_JOB = IT_THIN_CLIENT_COMPUTE_TEST +
"$IgniteExceptionJob";
+ inline static const std::string ECHO_JOB = IT_THIN_CLIENT_COMPUTE_TEST +
"$EchoJob";
+
static constexpr const char *KEY_COLUMN = "key";
static constexpr const char *VAL_COLUMN = "val";
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index d71548cf61..fe00808716 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Internal
{
using System.Collections.Generic;
+ using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
using Ignite.Compute;
@@ -85,6 +86,9 @@ namespace Apache.Ignite.Internal
for (var i = 0; i < count; i++)
{
+ var fieldCount = r.ReadArrayHeader();
+ Debug.Assert(fieldCount == 4, "fieldCount == 4");
+
res.Add(new ClusterNode(
Id: r.ReadString(),
Name: r.ReadString(),