IGNITE-2156 .Net: Added ClientDisconnectedException to API. This closes #397.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acaeafb8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acaeafb8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acaeafb8 Branch: refs/heads/ignite-1232 Commit: acaeafb84ca46a402ccb75d59620d197a7c549fe Parents: 8562b00 Author: vozerov-gridgain <[email protected]> Authored: Wed Feb 17 16:11:36 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Feb 17 16:11:36 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformProcessorImpl.java | 17 ++- .../callback/PlatformCallbackGateway.java | 30 +++++ .../callback/PlatformCallbackUtils.java | 15 +++ .../cpp/common/include/ignite/common/java.h | 9 ++ modules/platforms/cpp/common/src/java.cpp | 16 ++- .../Apache.Ignite.Core.Tests.csproj | 2 + .../Process/IgniteProcess.cs | 16 +++ .../ProcessExtensions.cs | 78 +++++++++++++ .../Apache.Ignite.Core.Tests/ReconnectTest.cs | 96 ++++++++++++++++ .../Apache.Ignite.Core.csproj | 1 + .../Apache.Ignite.Core/Cluster/ICluster.cs | 14 +++ .../Common/ClientDisconnectedException.cs | 97 ++++++++++++++++ .../Impl/Binary/BinaryUtils.cs | 2 +- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 2 +- .../Impl/Compute/ComputeTaskHolder.cs | 2 +- .../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 110 +++++++++++-------- .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 32 +++++- .../Apache.Ignite.Core/Impl/IgniteProxy.cs | 7 ++ .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 3 + .../Impl/Unmanaged/UnmanagedCallbacks.cs | 32 +++++- 20 files changed, 524 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index d0e0a63..76967ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -27,7 +27,9 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.PlatformConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteComputeImpl; -import org.apache.ignite.internal.binary.*; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; @@ -53,6 +55,7 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformConfiguratio import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; import java.util.Collection; @@ -379,6 +382,18 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + platformCtx.gateway().onClientDisconnected(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + platformCtx.gateway().onClientReconnected(clusterRestarted); + + return null; + } + + /** {@inheritDoc} */ @Override public void getIgniteConfiguration(long memPtr) { PlatformOutputStream stream = platformCtx.memory().get(memPtr).output(); BinaryRawWriterEx writer = platformCtx.writer(stream); http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 47862a2..5093773 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -911,6 +911,36 @@ public class PlatformCallbackGateway { } /** + * Notifies platform about client disconnect. + */ + public void onClientDisconnected() { + enter(); + + try { + PlatformCallbackUtils.onClientDisconnected(envPtr); + } + finally { + leave(); + } + } + + /** + * Notifies platform about client reconnect. + * + * @param clusterRestarted Cluster restarted flag. + */ + public void onClientReconnected(boolean clusterRestarted) { + enter(); + + try { + PlatformCallbackUtils.onClientReconnected(envPtr, clusterRestarted); + } + finally { + leave(); + } + } + + /** * Kernal stop callback. */ public void onStop() { http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 3112e0f..f7d6586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -481,6 +481,21 @@ public class PlatformCallbackUtils { static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2); /** + * Notifies platform about client disconnect. + * + * @param envPtr Environment pointer. + */ + static native void onClientDisconnected(long envPtr); + + /** + * Notifies platform about client reconnect. + * + * @param envPtr Environment pointer. + * @param clusterRestarted Cluster restarted flag. + */ + static native void onClientReconnected(long envPtr, boolean clusterRestarted); + + /** * Private constructor. */ private PlatformCallbackUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/cpp/common/include/ignite/common/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h index 8f5823e..ed47bc3 100644 --- a/modules/platforms/cpp/common/include/ignite/common/java.h +++ b/modules/platforms/cpp/common/include/ignite/common/java.h @@ -103,6 +103,9 @@ namespace ignite typedef long long(JNICALL *ExtensionCallbackInLongOutLongHandler)(void* target, int typ, long long arg1); typedef long long(JNICALL *ExtensionCallbackInLongLongOutLongHandler)(void* target, int typ, long long arg1, long long arg2); + typedef void(JNICALL *OnClientDisconnectedHandler)(void* target); + typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted); + /** * JNI handlers holder. */ @@ -177,6 +180,9 @@ namespace ignite ExtensionCallbackInLongOutLongHandler extensionCallbackInLongOutLong; ExtensionCallbackInLongLongOutLongHandler extensionCallbackInLongLongOutLong; + + OnClientDisconnectedHandler onClientDisconnected; + OnClientReconnectedHandler onClientReconnected; }; /** @@ -727,6 +733,9 @@ namespace ignite JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1); JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2); + + JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr); + JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/cpp/common/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp index d6f7ef0..8fc2293 100644 --- a/modules/platforms/cpp/common/src/java.cpp +++ b/modules/platforms/cpp/common/src/java.cpp @@ -352,6 +352,9 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true); JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG = JniMethod("extensionCallbackInLongLongOutLong", "(JIJJ)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true); + const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils"; JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true); JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true); @@ -808,7 +811,7 @@ namespace ignite void RegisterNatives(JNIEnv* env) { { - JNINativeMethod methods[52]; + JNINativeMethod methods[54]; int idx = 0; @@ -882,6 +885,9 @@ namespace ignite AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG, reinterpret_cast<void*>(JniExtensionCallbackInLongOutLong)); AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG, reinterpret_cast<void*>(JniExtensionCallbackInLongLongOutLong)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED, reinterpret_cast<void*>(JniOnClientDisconnected)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED, reinterpret_cast<void*>(JniOnClientReconnected)); + jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx); if (res != JNI_OK) @@ -2716,6 +2722,14 @@ namespace ignite JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2) { IGNITE_SAFE_FUNC(env, envPtr, ExtensionCallbackInLongLongOutLongHandler, extensionCallbackInLongLongOutLong, typ, arg1, arg2); } + + JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr) { + IGNITE_SAFE_PROC_NO_ARG(env, envPtr, OnClientDisconnectedHandler, onClientDisconnected); + } + + JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted) { + IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index fb14ed5..6f0e630 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -129,6 +129,8 @@ <Compile Include="MessagingTest.cs" /> <Compile Include="BinaryConfigurationTest.cs" /> <Compile Include="Binary\BinaryStructureTest.cs" /> + <Compile Include="ProcessExtensions.cs" /> + <Compile Include="ReconnectTest.cs" /> <Compile Include="SerializationTest.cs" /> <Compile Include="IgniteStartStopTest.cs" /> <Compile Include="TestUtils.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Process/IgniteProcess.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Process/IgniteProcess.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Process/IgniteProcess.cs index 4853d93..85464e9 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Process/IgniteProcess.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Process/IgniteProcess.cs @@ -225,6 +225,22 @@ namespace Apache.Ignite.Core.Tests.Process } /// <summary> + /// Suspends the process. + /// </summary> + public void Suspend() + { + _proc.Suspend(); + } + + /// <summary> + /// Resumes the process. + /// </summary> + public void Resume() + { + _proc.Resume(); + } + + /// <summary> /// Join process. /// </summary> /// <returns>Exit code.</returns> http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs new file mode 100644 index 0000000..b4c0a27 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProcessExtensions.cs @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests +{ + using System; + using System.Diagnostics; + using System.Linq; + using System.Runtime.InteropServices; + + /// <summary> + /// Process extensions. + /// </summary> + public static class ProcessExtensions + { + /** */ + private const int ThreadAccessSuspendResume = 0x2; + + /** */ + [DllImport("kernel32.dll")] + private static extern IntPtr OpenThread(int dwDesiredAccess, bool bInheritHandle, uint dwThreadId); + + /** */ + [DllImport("kernel32.dll")] + private static extern uint SuspendThread(IntPtr hThread); + + /** */ + [DllImport("kernel32.dll")] + private static extern int ResumeThread(IntPtr hThread); + + /// <summary> + /// Suspends the specified process. + /// </summary> + /// <param name="process">The process.</param> + public static void Suspend(this System.Diagnostics.Process process) + { + foreach (var thread in process.Threads.Cast<ProcessThread>()) + { + var pOpenThread = OpenThread(ThreadAccessSuspendResume, false, (uint)thread.Id); + + if (pOpenThread == IntPtr.Zero) + break; + + SuspendThread(pOpenThread); + } + } + /// <summary> + /// Resumes the specified process. + /// </summary> + /// <param name="process">The process.</param> + public static void Resume(this System.Diagnostics.Process process) + { + foreach (var thread in process.Threads.Cast<ProcessThread>()) + { + var pOpenThread = OpenThread(ThreadAccessSuspendResume, false, (uint)thread.Id); + + if (pOpenThread == IntPtr.Zero) + break; + + ResumeThread(pOpenThread); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ReconnectTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ReconnectTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ReconnectTest.cs new file mode 100644 index 0000000..5cb0a4f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ReconnectTest.cs @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma warning disable 618 // Deprecated SpringConfigUrl +namespace Apache.Ignite.Core.Tests +{ + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Tests.Process; + using NUnit.Framework; + + /// <summary> + /// Client reconnect tests. + /// </summary> + public class ReconnectTest + { + /// <summary> + /// Tests the disconnected exception. + /// </summary> + [Test] + public void TestDisconnectedException() + { + var cfg = new IgniteConfiguration + { + SpringConfigUrl = "config\\compute\\compute-grid1.xml", + JvmClasspath = TestUtils.CreateTestClasspath(), + JvmOptions = TestUtils.TestJavaOptions() + }; + + var proc = StartServerProcess(cfg); + + Ignition.ClientMode = true; + + using (var ignite = Ignition.Start(cfg)) + { + Assert.IsTrue(ignite.GetCluster().ClientReconnectTask.IsCompleted); + + var cache = ignite.GetCache<int, int>(null); + + cache[1] = 1; + + // Suspend external process to cause disconnect + proc.Suspend(); + + var ex = Assert.Throws<CacheException>(() => cache.Get(1)); + + var inner = (ClientDisconnectedException) ex.InnerException; + + var clientReconnectTask = inner.ClientReconnectTask; + + Assert.AreEqual(ignite.GetCluster().ClientReconnectTask, clientReconnectTask); + + // Resume process to reconnect + proc.Resume(); + + clientReconnectTask.Wait(); + + Assert.AreEqual(1, cache[1]); + } + } + + /// <summary> + /// Starts the server process. + /// </summary> + private static IgniteProcess StartServerProcess(IgniteConfiguration cfg) + { + return new IgniteProcess( + "-springConfigUrl=" + cfg.SpringConfigUrl, "-J-ea", "-J-Xcheck:jni", "-J-Xms512m", "-J-Xmx512m", + "-J-DIGNITE_QUIET=false"); + } + + /// <summary> + /// Fixture tear down. + /// </summary> + [TestFixtureTearDown] + public void FixtureTearDown() + { + IgniteProcess.KillAll(); + Ignition.ClientMode = false; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index d0ef352..661040b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -109,6 +109,7 @@ <Compile Include="Common\IFactory.cs" /> <Compile Include="Cache\Store\ICacheStoreSession.cs" /> <Compile Include="Cache\Store\Package-Info.cs" /> + <Compile Include="Common\ClientDisconnectedException.cs" /> <Compile Include="Cluster\ClusterGroupEmptyException.cs" /> <Compile Include="Cluster\ClusterTopologyException.cs" /> <Compile Include="Cluster\ICluster.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs index e50970b..812a644 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cluster/ICluster.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Cluster using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; + using System.Threading.Tasks; using Apache.Ignite.Core.Common; /// <summary> @@ -75,5 +76,18 @@ namespace Apache.Ignite.Core.Cluster /// Resets local I/O, job, and task execution metrics. /// </summary> void ResetMetrics(); + + /// <summary> + /// Gets the reconnect task, which will transition to Completed state + /// when local client node reconnects to the cluster. + /// <para /> + /// Result of the task indicates whether cluster has been restarted. + /// <para /> + /// If local node is not in client mode or is not disconnected, returns completed task. + /// </summary> + /// <value> + /// The reconnect task. + /// </value> + Task<bool> ClientReconnectTask { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Common/ClientDisconnectedException.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Common/ClientDisconnectedException.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Common/ClientDisconnectedException.cs new file mode 100644 index 0000000..8843a0b --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Common/ClientDisconnectedException.cs @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Common +{ + using System; + using System.Diagnostics.CodeAnalysis; + using System.Runtime.Serialization; + using System.Threading.Tasks; + + /// <summary> + /// Indicates that client-mode local node has been disconnected from the cluster. + /// </summary> + [SuppressMessage("Microsoft.Usage", "CA2240:ImplementISerializableCorrectly", + Justification = "No need to implement GetObjectData because there are no custom fields.")] + [Serializable] + public sealed class ClientDisconnectedException : IgniteException + { + /// <summary> + /// The client reconnect task. + /// </summary> + private readonly Task<bool> _clientReconnectTask; + + /// <summary> + /// Initializes a new instance of the <see cref="ClientDisconnectedException"/> class. + /// </summary> + public ClientDisconnectedException() + { + // No-op. + } + + /// <summary> + /// Initializes a new instance of the <see cref="ClientDisconnectedException"/> class. + /// </summary> + /// <param name="message">The message that describes the error.</param> + public ClientDisconnectedException(string message) : base(message) + { + // No-op. + } + + /// <summary> + /// Initializes a new instance of the <see cref="ClientDisconnectedException"/> class. + /// </summary> + /// <param name="message">The message.</param> + /// <param name="cause">The cause.</param> + public ClientDisconnectedException(string message, Exception cause) : base(message, cause) + { + // No-op. + } + + /// <summary> + /// Initializes a new instance of the <see cref="ClientDisconnectedException" /> class. + /// </summary> + /// <param name="message">The message.</param> + /// <param name="cause">The cause.</param> + /// <param name="clientReconnectTask">The client reconnect task.</param> + public ClientDisconnectedException(string message, Exception cause, Task<bool> clientReconnectTask) : base(message, cause) + { + _clientReconnectTask = clientReconnectTask; + } + + /// <summary> + /// Initializes a new instance of the <see cref="ClientDisconnectedException"/> class. + /// </summary> + /// <param name="info">Serialization information.</param> + /// <param name="ctx">Streaming context.</param> + private ClientDisconnectedException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) + { + // No-op. + } + + /// <summary> + /// Gets the client reconnect task, if present. + /// </summary> + /// <value> + /// The client reconnect task, or null. + /// </value> + public Task<bool> ClientReconnectTask + { + get { return _clientReconnectTask; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs index 9066bd1..b73a6c4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs @@ -1658,7 +1658,7 @@ namespace Apache.Ignite.Core.Impl.Binary err = reader.ReadBoolean() ? reader.ReadObject<object>() - : ExceptionUtils.GetException(reader.ReadString(), reader.ReadString()); + : ExceptionUtils.GetException(reader.Marshaller.Ignite, reader.ReadString(), reader.ReadString()); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index d1296ec..1296596 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -1170,7 +1170,7 @@ namespace Apache.Ignite.Core.Impl.Cache var msg = Unmarshal<string>(inStream); - return new CacheEntryProcessorException(ExceptionUtils.GetException(clsName, msg)); + return new CacheEntryProcessorException(ExceptionUtils.GetException(_ignite, clsName, msg)); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs index e992245..a7988c5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs @@ -391,7 +391,7 @@ namespace Apache.Ignite.Core.Impl.Compute { err = reader.ReadBoolean() ? reader.ReadObject<BinaryObject>().Deserialize<Exception>() - : ExceptionUtils.GetException(reader.ReadString(), reader.ReadString()); + : ExceptionUtils.GetException(_compute.Marshaller.Ignite, reader.ReadString(), reader.ReadString()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs index 4d2e458..695f156 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Security; + using System.Text.RegularExpressions; using System.Threading; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Store; @@ -44,102 +45,119 @@ namespace Apache.Ignite.Core.Impl /** InteropCachePartialUpdateException. */ private const string ClsCachePartialUpdateErr = "org.apache.ignite.internal.processors.platform.cache.PlatformCachePartialUpdateException"; - + /** Map with predefined exceptions. */ private static readonly IDictionary<string, ExceptionFactoryDelegate> Exs = new Dictionary<string, ExceptionFactoryDelegate>(); /** Exception factory delegate. */ - private delegate Exception ExceptionFactoryDelegate(string msg); - + private delegate Exception ExceptionFactoryDelegate(IIgnite ignite, string msg, Exception innerEx); + + /** Inner class regex. */ + private static readonly Regex InnerClassRegex = new Regex(@"class ([^\s]+): (.*)", RegexOptions.Compiled); + /// <summary> /// Static initializer. /// </summary> - [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", + [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline", Justification = "Readability")] static ExceptionUtils() { // Common Java exceptions mapped to common .Net exceptions. - Exs["java.lang.IllegalArgumentException"] = m => new ArgumentException(m); - Exs["java.lang.IllegalStateException"] = m => new InvalidOperationException(m); - Exs["java.lang.UnsupportedOperationException"] = m => new NotImplementedException(m); - Exs["java.lang.InterruptedException"] = m => new ThreadInterruptedException(m); - + Exs["java.lang.IllegalArgumentException"] = (i, m, e) => new ArgumentException(m, e); + Exs["java.lang.IllegalStateException"] = (i, m, e) => new InvalidOperationException(m, e); + Exs["java.lang.UnsupportedOperationException"] = (i, m, e) => new NotImplementedException(m, e); + Exs["java.lang.InterruptedException"] = (i, m, e) => new ThreadInterruptedException(m, e); + // Generic Ignite exceptions. - Exs["org.apache.ignite.IgniteException"] = m => new IgniteException(m); - Exs["org.apache.ignite.IgniteCheckedException"] = m => new IgniteException(m); + Exs["org.apache.ignite.IgniteException"] = (i, m, e) => new IgniteException(m, e); + Exs["org.apache.ignite.IgniteCheckedException"] = (i, m, e) => new IgniteException(m, e); + Exs["org.apache.ignite.IgniteClientDisconnectedException"] = (i, m, e) => new ClientDisconnectedException(m, e, i.GetCluster().ClientReconnectTask); + Exs["org.apache.ignite.internal.IgniteClientDisconnectedCheckedException"] = (i, m, e) => new ClientDisconnectedException(m, e, i.GetCluster().ClientReconnectTask); // Cluster exceptions. - Exs["org.apache.ignite.cluster.ClusterGroupEmptyException"] = m => new ClusterGroupEmptyException(m); - Exs["org.apache.ignite.cluster.ClusterTopologyException"] = m => new ClusterTopologyException(m); + Exs["org.apache.ignite.cluster.ClusterGroupEmptyException"] = (i, m, e) => new ClusterGroupEmptyException(m, e); + Exs["org.apache.ignite.cluster.ClusterTopologyException"] = (i, m, e) => new ClusterTopologyException(m, e); // Compute exceptions. - Exs["org.apache.ignite.compute.ComputeExecutionRejectedException"] = m => new ComputeExecutionRejectedException(m); - Exs["org.apache.ignite.compute.ComputeJobFailoverException"] = m => new ComputeJobFailoverException(m); - Exs["org.apache.ignite.compute.ComputeTaskCancelledException"] = m => new ComputeTaskCancelledException(m); - Exs["org.apache.ignite.compute.ComputeTaskTimeoutException"] = m => new ComputeTaskTimeoutException(m); - Exs["org.apache.ignite.compute.ComputeUserUndeclaredException"] = m => new ComputeUserUndeclaredException(m); + Exs["org.apache.ignite.compute.ComputeExecutionRejectedException"] = (i, m, e) => new ComputeExecutionRejectedException(m, e); + Exs["org.apache.ignite.compute.ComputeJobFailoverException"] = (i, m, e) => new ComputeJobFailoverException(m, e); + Exs["org.apache.ignite.compute.ComputeTaskCancelledException"] = (i, m, e) => new ComputeTaskCancelledException(m, e); + Exs["org.apache.ignite.compute.ComputeTaskTimeoutException"] = (i, m, e) => new ComputeTaskTimeoutException(m, e); + Exs["org.apache.ignite.compute.ComputeUserUndeclaredException"] = (i, m, e) => new ComputeUserUndeclaredException(m, e); // Cache exceptions. - Exs["javax.cache.CacheException"] = m => new CacheException(m); - Exs["javax.cache.integration.CacheLoaderException"] = m => new CacheStoreException(m); - Exs["javax.cache.integration.CacheWriterException"] = m => new CacheStoreException(m); - Exs["javax.cache.processor.EntryProcessorException"] = m => new CacheEntryProcessorException(m); - Exs["org.apache.ignite.cache.CacheAtomicUpdateTimeoutException"] = m => new CacheAtomicUpdateTimeoutException(m); - + Exs["javax.cache.CacheException"] = (i, m, e) => new CacheException(m, e); + Exs["javax.cache.integration.CacheLoaderException"] = (i, m, e) => new CacheStoreException(m, e); + Exs["javax.cache.integration.CacheWriterException"] = (i, m, e) => new CacheStoreException(m, e); + Exs["javax.cache.processor.EntryProcessorException"] = (i, m, e) => new CacheEntryProcessorException(m, e); + Exs["org.apache.ignite.cache.CacheAtomicUpdateTimeoutException"] = (i, m, e) => new CacheAtomicUpdateTimeoutException(m, e); + // Transaction exceptions. - Exs["org.apache.ignite.transactions.TransactionOptimisticException"] = m => new TransactionOptimisticException(m); - Exs["org.apache.ignite.transactions.TransactionTimeoutException"] = m => new TransactionTimeoutException(m); - Exs["org.apache.ignite.transactions.TransactionRollbackException"] = m => new TransactionRollbackException(m); - Exs["org.apache.ignite.transactions.TransactionHeuristicException"] = m => new TransactionHeuristicException(m); + Exs["org.apache.ignite.transactions.TransactionOptimisticException"] = (i, m, e) => new TransactionOptimisticException(m, e); + Exs["org.apache.ignite.transactions.TransactionTimeoutException"] = (i, m, e) => new TransactionTimeoutException(m, e); + Exs["org.apache.ignite.transactions.TransactionRollbackException"] = (i, m, e) => new TransactionRollbackException(m, e); + Exs["org.apache.ignite.transactions.TransactionHeuristicException"] = (i, m, e) => new TransactionHeuristicException(m, e); // Security exceptions. - Exs["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m); - Exs["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m); + Exs["org.apache.ignite.IgniteAuthenticationException"] = (i, m, e) => new SecurityException(m, e); + Exs["org.apache.ignite.plugin.security.GridSecurityException"] = (i, m, e) => new SecurityException(m, e); // Future exceptions - Exs["org.apache.ignite.lang.IgniteFutureCancelledException"] = m => new IgniteFutureCancelledException(m); - Exs["org.apache.ignite.internal.IgniteFutureCancelledCheckedException"] = m => new IgniteFutureCancelledException(m); + Exs["org.apache.ignite.lang.IgniteFutureCancelledException"] = (i, m, e) => new IgniteFutureCancelledException(m, e); + Exs["org.apache.ignite.internal.IgniteFutureCancelledCheckedException"] = (i, m, e) => new IgniteFutureCancelledException(m, e); } /// <summary> /// Creates exception according to native code class and message. /// </summary> + /// <param name="ignite">The ignite.</param> /// <param name="clsName">Exception class name.</param> /// <param name="msg">Exception message.</param> /// <param name="reader">Error data reader.</param> - public static Exception GetException(string clsName, string msg, BinaryReader reader = null) + /// <returns>Exception.</returns> + public static Exception GetException(IIgnite ignite, string clsName, string msg, BinaryReader reader = null) { ExceptionFactoryDelegate ctor; if (Exs.TryGetValue(clsName, out ctor)) - return ctor(msg); + { + var match = InnerClassRegex.Match(msg); - if (ClsNoClsDefFoundErr.Equals(clsName)) + ExceptionFactoryDelegate innerCtor; + + if (match.Success && Exs.TryGetValue(match.Groups[1].Value, out innerCtor)) + return ctor(ignite, msg, innerCtor(ignite, match.Groups[2].Value, null)); + + return ctor(ignite, msg, null); + } + + if (ClsNoClsDefFoundErr.Equals(clsName, StringComparison.OrdinalIgnoreCase)) return new IgniteException("Java class is not found (did you set IGNITE_HOME environment " + "variable?): " + msg); - if (ClsNoSuchMthdErr.Equals(clsName)) + if (ClsNoSuchMthdErr.Equals(clsName, StringComparison.OrdinalIgnoreCase)) return new IgniteException("Java class method is not found (did you set IGNITE_HOME environment " + "variable?): " + msg); - if (ClsCachePartialUpdateErr.Equals(clsName)) - return ProcessCachePartialUpdateException(msg, reader); - + if (ClsCachePartialUpdateErr.Equals(clsName, StringComparison.OrdinalIgnoreCase)) + return ProcessCachePartialUpdateException(ignite, msg, reader); + return new IgniteException("Java exception occurred [class=" + clsName + ", message=" + msg + ']'); } /// <summary> /// Process cache partial update exception. /// </summary> + /// <param name="ignite">The ignite.</param> /// <param name="msg">Message.</param> /// <param name="reader">Reader.</param> - /// <returns></returns> + /// <returns>CachePartialUpdateException.</returns> [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - private static Exception ProcessCachePartialUpdateException(string msg, BinaryReader reader) + private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, BinaryReader reader) { if (reader == null) return new CachePartialUpdateException(msg, new IgniteException("Failed keys are not available.")); - + bool dataExists = reader.ReadBoolean(); Debug.Assert(dataExists); @@ -160,12 +178,12 @@ namespace Apache.Ignite.Core.Impl return new CachePartialUpdateException(msg, e); } } - + // Was not able to write keys. string innerErrCls = reader.ReadString(); string innerErrMsg = reader.ReadString(); - Exception innerErr = GetException(innerErrCls, innerErrMsg); + Exception innerErr = GetException(ignite, innerErrCls, innerErrMsg); return new CachePartialUpdateException(msg, innerErr); } @@ -179,7 +197,7 @@ namespace Apache.Ignite.Core.Impl public static Exception GetJvmInitializeException(string clsName, string msg) { if (clsName != null) - return new IgniteException("Failed to initialize JVM.", GetException(clsName, msg)); + return new IgniteException("Failed to initialize JVM.", GetException(null, clsName, msg)); if (msg != null) return new IgniteException("Failed to initialize JVM: " + msg); @@ -194,7 +212,7 @@ namespace Apache.Ignite.Core.Impl /// <returns>List.</returns> private static List<object> ReadNullableList(BinaryReader reader) { - if (!reader.ReadBoolean()) + if (!reader.ReadBoolean()) return null; var size = reader.ReadInt(); http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index be21d7f..0271fa2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; @@ -86,10 +87,13 @@ namespace Apache.Ignite.Core.Impl private readonly UnmanagedCallbacks _cbs; /** Node info cache. */ - private readonly ConcurrentDictionary<Guid, ClusterNodeImpl> _nodes = new ConcurrentDictionary<Guid, ClusterNodeImpl>(); + /** Client reconnect task completion source. */ + private volatile TaskCompletionSource<bool> _clientReconnectTaskCompletionSource = + new TaskCompletionSource<bool>(); + /// <summary> /// Constructor. /// </summary> @@ -128,6 +132,9 @@ namespace Apache.Ignite.Core.Impl // Grid is not completely started here, can't initialize interop transactions right away. _transactions = new Lazy<TransactionsImpl>( () => new TransactionsImpl(UU.ProcessorTransactions(proc), marsh, GetLocalNode().Id)); + + // Set reconnected task to completed state for convenience. + _clientReconnectTaskCompletionSource.SetResult(false); } /// <summary> @@ -429,6 +436,12 @@ namespace Apache.Ignite.Core.Impl } /** <inheritdoc /> */ + public Task<bool> ClientReconnectTask + { + get { return _clientReconnectTaskCompletionSource.Task; } + } + + /** <inheritdoc /> */ public IDataStreamer<TK, TV> GetDataStreamer<TK, TV>(string cacheName) { return new DataStreamerImpl<TK, TV>(UU.ProcessorDataStreamer(_proc, cacheName, false), @@ -630,5 +643,22 @@ namespace Apache.Ignite.Core.Impl { get { return _proc; } } + + /// <summary> + /// Called when local client node has been disconnected from the cluster. + /// </summary> + public void OnClientDisconnected() + { + _clientReconnectTaskCompletionSource = new TaskCompletionSource<bool>(); + } + + /// <summary> + /// Called when local client node has been reconnected to the cluster. + /// </summary> + /// <param name="clusterRestarted">Cluster restarted flag.</param> + public void OnClientReconnected(bool clusterRestarted) + { + _clientReconnectTaskCompletionSource.TrySetResult(clusterRestarted); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs index a303783..0aa55fb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; @@ -285,6 +286,12 @@ namespace Apache.Ignite.Core.Impl } /** <inheritdoc /> */ + public Task<bool> ClientReconnectTask + { + get { return _ignite.GetCluster().ClientReconnectTask; } + } + + /** <inheritdoc /> */ public IDataStreamer<TK, TV> GetDataStreamer<TK, TV>(string cacheName) { return _ignite.GetDataStreamer<TK, TV>(cacheName); http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs index 8147e9d..fb52033 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs @@ -95,5 +95,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged internal void* extensionCbInLongOutLong; internal void* extensionCbInLongLongOutLong; + + internal void* onClientDisconnected; + internal void* ocClientReconnected; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/acaeafb8/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index 7778484..8d810e3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -162,6 +162,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1); private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2); + private delegate void OnClientDisconnectedDelegate(void* target); + private delegate void OnClientReconnectedDelegate(void* target, bool clusterRestarted); + /// <summary> /// constructor. /// </summary> @@ -241,7 +244,10 @@ namespace Apache.Ignite.Core.Impl.Unmanaged error = CreateFunctionPointer((ErrorCallbackDelegate)Error), extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong), - extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong) + extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong), + + onClientDisconnected = CreateFunctionPointer((OnClientDisconnectedDelegate)OnClientDisconnected), + ocClientReconnected = CreateFunctionPointer((OnClientReconnectedDelegate)OnClientReconnected), }; _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize()); @@ -728,7 +734,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged string errCls = reader.ReadString(); string errMsg = reader.ReadString(); - Exception err = ExceptionUtils.GetException(errCls, errMsg, reader); + Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, reader); ProcessFuture(futPtr, fut => { fut.OnError(err); }); } @@ -1043,10 +1049,10 @@ namespace Apache.Ignite.Core.Impl.Unmanaged // Stream disposal intentionally omitted: IGNITE-1598 var stream = new PlatformRawMemory(errData, errDataLen).GetStream(); - throw ExceptionUtils.GetException(errCls, errMsg, _ignite.Marshaller.StartUnmarshal(stream)); + throw ExceptionUtils.GetException(_ignite, errCls, errMsg, _ignite.Marshaller.StartUnmarshal(stream)); } - throw ExceptionUtils.GetException(errCls, errMsg); + throw ExceptionUtils.GetException(_ignite, errCls, errMsg); case ErrJvmInit: throw ExceptionUtils.GetJvmInitializeException(errCls, errMsg); @@ -1059,8 +1065,24 @@ namespace Apache.Ignite.Core.Impl.Unmanaged } } + private void OnClientDisconnected(void* target) + { + SafeCall(() => + { + _ignite.OnClientDisconnected(); + }); + } + + private void OnClientReconnected(void* target, bool clusterRestarted) + { + SafeCall(() => + { + _ignite.OnClientReconnected(clusterRestarted); + }); + } + #endregion - + #region HELPERS [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
