IGNITE-5576: Added Compute::Run() for C++ (cherry picked from commit 80c95ff79f344daf1fca3f094733a24bac2a218d)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29d532e8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29d532e8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29d532e8 Branch: refs/heads/ignite-2.1 Commit: 29d532e8be971ccac40ece00fc84a6a6bffdad0f Parents: ad42f62 Author: Igor Sapego <[email protected]> Authored: Wed Jul 5 18:51:27 2017 +0300 Committer: Igor Sapego <[email protected]> Committed: Wed Jul 5 18:51:58 2017 +0300 ---------------------------------------------------------------------- .../core-test/config/cache-query-default.xml | 18 ++ .../cpp/core-test/src/compute_test.cpp | 176 +++++++++++++++++++ .../cpp/core/include/ignite/compute/compute.h | 35 +++- .../include/ignite/impl/compute/compute_impl.h | 42 +++++ .../ignite/impl/compute/compute_job_holder.h | 73 ++++++++ .../ignite/impl/compute/compute_job_result.h | 112 ++++++++++++ .../ignite/impl/compute/compute_task_holder.h | 85 +++++++++ 7 files changed, 539 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/config/cache-query-default.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/config/cache-query-default.xml b/modules/platforms/cpp/core-test/config/cache-query-default.xml index 38636e5..16f601d 100644 --- a/modules/platforms/cpp/core-test/config/cache-query-default.xml +++ b/modules/platforms/cpp/core-test/config/cache-query-default.xml @@ -94,6 +94,12 @@ <property name="atomicityMode" value="TRANSACTIONAL"/> <property name="writeSynchronizationMode" value="FULL_SYNC"/> + <property name="affinity"> + <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"> + <property name="partitions" value="256"/> + </bean> + </property> + <property name="queryEntities"> <list> <bean class="org.apache.ignite.cache.QueryEntity"> @@ -115,6 +121,12 @@ <property name="atomicityMode" value="TRANSACTIONAL"/> <property name="writeSynchronizationMode" value="FULL_SYNC"/> + <property name="affinity"> + <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"> + <property name="partitions" value="256"/> + </bean> + </property> + <!-- Configure type metadata to enable queries. --> <property name="queryEntities"> <list> @@ -132,6 +144,12 @@ <property name="atomicityMode" value="TRANSACTIONAL"/> <property name="writeSynchronizationMode" value="FULL_SYNC"/> + <property name="affinity"> + <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction"> + <property name="partitions" value="256"/> + </bean> + </property> + <!-- Configure type metadata to enable queries. --> <property name="queryEntities"> <list> http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core-test/src/compute_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp b/modules/platforms/cpp/core-test/src/compute_test.cpp index d3b1183..8c57ef1 100644 --- a/modules/platforms/cpp/core-test/src/compute_test.cpp +++ b/modules/platforms/cpp/core-test/src/compute_test.cpp @@ -146,6 +146,49 @@ struct Func2 : ComputeFunc<std::string> IgniteError err; }; +struct Func3 : ComputeFunc<void> +{ + Func3() : + a(), b(), err() + { + // No-op. + } + + Func3(int32_t a, int32_t b) : + a(a), b(b), err() + { + // No-op. + } + + Func3(IgniteError err) : + a(), b(), err(err) + { + // No-op. + } + + virtual void Call() + { + boost::this_thread::sleep_for(boost::chrono::milliseconds(200)); + + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + throw err; + + std::stringstream tmp; + + tmp << a << '.' << b; + + res = tmp.str(); + } + + int32_t a; + int32_t b; + IgniteError err; + + static std::string res; +}; + +std::string Func3::res; + namespace ignite { namespace binary @@ -235,6 +278,49 @@ namespace ignite dst.err = reader.ReadObject<IgniteError>("err"); } }; + + template<> + struct BinaryType<Func3> + { + static int32_t GetTypeId() + { + return GetBinaryStringHashCode("Func3"); + } + + static void GetTypeName(std::string& dst) + { + dst = "Func3"; + } + + static int32_t GetFieldId(const char* name) + { + return GetBinaryStringHashCode(name); + } + + static bool IsNull(const Func3& obj) + { + return false; + } + + static void GetNull(Func3& dst) + { + dst = Func3(0, 0); + } + + static void Write(BinaryWriter& writer, const Func3& obj) + { + writer.WriteInt32("a", obj.a); + writer.WriteInt32("b", obj.b); + writer.WriteObject<IgniteError>("err", obj.err); + } + + static void Read(BinaryReader& reader, Func3& dst) + { + dst.a = reader.ReadInt32("a"); + dst.b = reader.ReadInt32("b"); + dst.err = reader.ReadObject<IgniteError>("err"); + } + }; } } @@ -244,6 +330,7 @@ IGNITE_EXPORTED_CALL void IgniteModuleInit1(IgniteBindingContext& context) binding.RegisterComputeFunc<Func1>(); binding.RegisterComputeFunc<Func2>(); + binding.RegisterComputeFunc<Func3>(); } BOOST_FIXTURE_TEST_SUITE(ComputeTestSuite, ComputeTestSuiteFixture) @@ -334,4 +421,93 @@ BOOST_AUTO_TEST_CASE(IgniteCallTestRemoteError) BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); } +BOOST_AUTO_TEST_CASE(IgniteRunSyncLocal) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + compute.Run(Func3(8, 5)); + + BOOST_CHECK_EQUAL(Func3::res, "8.5"); +} + +BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocal) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + Future<void> res = compute.RunAsync(Func3(312, 245)); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + res.GetValue(); + + BOOST_CHECK_EQUAL(Func3::res, "312.245"); +} + +BOOST_AUTO_TEST_CASE(IgniteRunSyncLocalError) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + + BOOST_CHECK_EXCEPTION(compute.Run(Func3(MakeTestError())), IgniteError, IsTestError); +} + +BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + Future<void> res = compute.RunAsync(Func3(MakeTestError())); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); +} + +BOOST_AUTO_TEST_CASE(IgniteRunTestRemote) +{ + Ignite node2 = MakeNode("ComputeNode2"); + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + compute.CallAsync<std::string>(Func2(8, 5)); + + compute.Run(Func3(42, 24)); + + BOOST_CHECK_EQUAL(Func3::res, "42.24"); +} + +BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError) +{ + Ignite node2 = MakeNode("ComputeNode2"); + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Running"); + compute.CallAsync<std::string>(Func2(8, 5)); + + Future<void> res = compute.RunAsync(Func3(MakeTestError())); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); +} + + BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/compute/compute.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h index b079569..75c8c85 100644 --- a/modules/platforms/cpp/core/include/ignite/compute/compute.h +++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h @@ -94,7 +94,7 @@ namespace ignite * @tparam R Call return type. BinaryType should be specialized for * the type if it is not primitive. Should not be void. For * non-returning methods see Compute::Run(). - * @tparam F Compute function type. Should implement ComputeFunc + * @tparam F Compute function type. Should implement ComputeFunc<R> * class. * @param func Compute function to call. * @return Computation result. @@ -113,7 +113,7 @@ namespace ignite * @tparam R Call return type. BinaryType should be specialized for * the type if it is not primitive. Should not be void. For * non-returning methods see Compute::Run(). - * @tparam F Compute function type. Should implement ComputeFunc + * @tparam F Compute function type. Should implement ComputeFunc<R> * class. * @param func Compute function to call. * @return Future that can be used to access computation result once @@ -126,6 +126,37 @@ namespace ignite return impl.Get()->CallAsync<R, F>(func); } + /** + * Runs provided ComputeFunc on a node within the underlying cluster + * group. + * + * @tparam F Compute function type. Should implement ComputeFunc<void> + * class. + * @param action Compute function to call. + * @throw IgniteError in case of error. + */ + template<typename F> + void Run(const F& action) + { + return impl.Get()->RunAsync<F>(action).GetValue(); + } + + /** + * Asyncronuously runs provided ComputeFunc on a node within the + * underlying cluster group. + * + * @tparam F Compute function type. Should implement ComputeFunc<void> + * class. + * @param action Compute function to call. + * @return Future that can be used to wait for action to complete. + * @throw IgniteError in case of error. + */ + template<typename F> + Future<void> RunAsync(const F& action) + { + return impl.Get()->RunAsync<F>(action); + } + private: /** Implementation. */ common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl; http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h index 389c571..63f9a46 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h @@ -108,6 +108,48 @@ namespace ignite return promise.GetFuture(); } + /** + * Asyncronuously runs provided ComputeFunc on a node within + * the underlying cluster group. + * + * @tparam F Compute action type. Should implement ComputeAction + * class. + * @param action Compute action to call. + * @return Future that can be used to wait for action to complete. + * @throw IgniteError in case of error. + */ + template<typename F> + Future<void> RunAsync(const F& action) + { + common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); + interop::InteropOutputStream out(mem.Get()); + binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + + common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, void>(action)); + + int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job); + + ComputeTaskHolderImpl<F, void>* taskPtr = new ComputeTaskHolderImpl<F, void>(jobHandle); + common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr); + + int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task); + + writer.WriteInt64(taskHandle); + writer.WriteInt32(1); + writer.WriteInt64(jobHandle); + writer.WriteObject<F>(action); + + out.Synchronize(); + + jobject target = InStreamOutObject(Operation::Unicast, *mem.Get()); + std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target)); + + common::Promise<void>& promise = taskPtr->GetPromise(); + promise.SetCancelTarget(cancelable); + + return promise.GetFuture(); + } + private: IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl); }; http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h index e218e36..9f35a11 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h @@ -132,6 +132,79 @@ namespace ignite /** Job. */ JobType job; }; + + /** + * Compute job holder. Internal class. + * Specialisation for void return type + * + * @tparam F Actual job type. + */ + template<typename F> + class ComputeJobHolderImpl<F, void> : public ComputeJobHolder + { + public: + typedef F JobType; + + /** + * Constructor. + * + * @param job Job. + */ + ComputeJobHolderImpl(JobType job) : + job(job) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~ComputeJobHolderImpl() + { + // No-op. + } + + const ComputeJobResult<void>& GetResult() + { + return res; + } + + virtual void ExecuteLocal() + { + try + { + job.Call(); + res.SetResult(); + } + catch (const IgniteError& err) + { + res.SetError(err); + } + catch (const std::exception& err) + { + res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what())); + } + catch (...) + { + res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, + "Unknown error occurred during call.")); + } + } + + virtual void ExecuteRemote(binary::BinaryWriterImpl& writer) + { + ExecuteLocal(); + + res.Write(writer); + } + + private: + /** Result. */ + ComputeJobResult<void> res; + + /** Job. */ + JobType job; + }; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h index 5bcb762..0874522 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h @@ -27,6 +27,8 @@ #include <sstream> #include <ignite/common/promise.h> +#include <ignite/impl/binary/binary_reader_impl.h> +#include <ignite/impl/binary/binary_writer_impl.h> namespace ignite { @@ -154,6 +156,116 @@ namespace ignite /** Erorr. */ IgniteError err; }; + + /** + * Used to hold compute job result. + */ + template<> + class ComputeJobResult<void> + { + public: + /** + * Default constructor. + */ + ComputeJobResult() : + err() + { + // No-op. + } + + /** + * Mark as complete. + */ + void SetResult() + { + err = IgniteError(); + } + + /** + * Set error. + * + * @param error Error to set. + */ + void SetError(const IgniteError error) + { + err = error; + } + + /** + * Set promise to a state which corresponds to result. + * + * @param promise Promise, which state to set. + */ + void SetPromise(common::Promise<void>& promise) + { + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + promise.SetError(err); + else + promise.SetValue(); + } + + /** + * Write using writer. + * + * @param writer Writer. + */ + void Write(binary::BinaryWriterImpl& writer) + { + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + { + // Fail + writer.WriteBool(false); + + // Native Exception + writer.WriteBool(true); + + writer.WriteObject<IgniteError>(err); + } + else + { + // Success + writer.WriteBool(true); + + writer.WriteNull(); + } + } + + /** + * Read using reader. + * + * @param reader Reader. + */ + void Read(binary::BinaryReaderImpl& reader) + { + bool success = reader.ReadBool(); + + if (success) + err = IgniteError(); + else + { + bool native = reader.ReadBool(); + + if (native) + err = reader.ReadObject<IgniteError>(); + else + { + std::stringstream buf; + + buf << reader.ReadObject<std::string>() << " : "; + buf << reader.ReadObject<std::string>() << ", "; + buf << reader.ReadObject<std::string>(); + + std::string msg = buf.str(); + + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str()); + } + } + } + + private: + /** Erorr. */ + IgniteError err; + }; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/29d532e8/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h index bdd7513..f627f27 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h @@ -206,6 +206,91 @@ namespace ignite /** Task result promise. */ common::Promise<ResultType> promise; }; + + /** + * Compute task holder type-specific implementation. + */ + template<typename F> + class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder + { + public: + typedef F JobType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + ComputeTaskHolderImpl(int64_t handle) : + ComputeTaskHolder(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~ComputeTaskHolderImpl() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); + + res = job0.GetResult(); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + res.Read(reader); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise<void>& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult<void> res; + + /** Task result promise. */ + common::Promise<void> promise; + }; } } }
