http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs new file mode 100644 index 0000000..de5d4c7 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Resource/ResourceTypeDescriptor.cs @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Resource +{ + using System; + using System.Collections.Generic; + using System.Reflection; + using Apache.Ignite.Core.Cache.Store; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Resource; + + /// <summary> + /// Resource type descriptor. + /// </summary> + internal class ResourceTypeDescriptor + { + /** Attribute type: InstanceResourceAttribute. */ + private static readonly Type TypAttrIgnite = typeof(InstanceResourceAttribute); + + /** Attribute type: StoreSessionResourceAttribute. */ + private static readonly Type TypAttrStoreSes = typeof(StoreSessionResourceAttribute); + + /** Type: IGrid. */ + private static readonly Type TypIgnite = typeof(IIgnite); + + /** Type: ICacheStoreSession. */ + private static readonly Type TypStoreSes = typeof (ICacheStoreSession); + + /** Type: ComputeTaskNoResultCacheAttribute. */ + private static readonly Type TypComputeTaskNoResCache = typeof(ComputeTaskNoResultCacheAttribute); + + /** Cached binding flags. */ + private static readonly BindingFlags Flags = BindingFlags.Instance | BindingFlags.Public | + BindingFlags.NonPublic | BindingFlags.DeclaredOnly; + + /** Ignite injectors. */ + private readonly IList<IResourceInjector> _igniteInjectors; + + /** Session injectors. */ + private readonly IList<IResourceInjector> _storeSesInjectors; + + /** Task "no result cache" flag. */ + private readonly bool _taskNoResCache; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="type">Type.</param> + internal ResourceTypeDescriptor(Type type) + { + Collector gridCollector = new Collector(TypAttrIgnite, TypIgnite); + Collector storeSesCollector = new Collector(TypAttrStoreSes, TypStoreSes); + + Type curType = type; + + while (curType != null) + { + CreateInjectors(curType, gridCollector, storeSesCollector); + + curType = curType.BaseType; + } + + _igniteInjectors = gridCollector.Injectors; + _storeSesInjectors = storeSesCollector.Injectors; + + _taskNoResCache = ContainsAttribute(type, TypComputeTaskNoResCache, true); + } + + /// <summary> + /// Inject resources to the given object. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="ignite">Grid.</param> + public void InjectIgnite(object target, Ignite ignite) + { + InjectIgnite(target, ignite.Proxy); + } + + /// <summary> + /// Inject resources to the given object. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="igniteProxy">Grid proxy.</param> + public void InjectIgnite(object target, IgniteProxy igniteProxy) + { + Inject0(target, igniteProxy, _igniteInjectors); + } + + /// <summary> + /// Inject store session. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="ses">Store session.</param> + public void InjectStoreSession(object target, ICacheStoreSession ses) + { + Inject0(target, ses, _storeSesInjectors); + } + + /// <summary> + /// Perform injection. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="injectee">Injectee.</param> + /// <param name="injectors">Injectors.</param> + private static void Inject0(object target, object injectee, ICollection<IResourceInjector> injectors) + { + if (injectors != null) + { + foreach (IResourceInjector injector in injectors) + injector.Inject(target, injectee); + } + } + + /// <summary> + /// Task "no result cache" flag. + /// </summary> + public bool TaskNoResultCache + { + get + { + return _taskNoResCache; + } + } + + /// <summary> + /// Create gridInjectors for the given type. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="collectors">Collectors.</param> + private static void CreateInjectors(Type type, params Collector[] collectors) + { + FieldInfo[] fields = type.GetFields(Flags); + + foreach (FieldInfo field in fields) + { + foreach (var collector in collectors) + { + if (!ContainsAttribute(field, collector.AttributeType, false)) + continue; + + if (!field.FieldType.IsAssignableFrom(collector.ResourceType)) + throw new IgniteException("Invalid field type for resource attribute [" + + "type=" + type.Name + + ", field=" + field.Name + + ", fieldType=" + field.FieldType.Name + + ", resourceType=" + collector.ResourceType.Name + ']'); + + collector.Add(new ResourceFieldInjector(field)); + } + } + + PropertyInfo[] props = type.GetProperties(Flags); + + foreach (var prop in props) + { + foreach (var collector in collectors) + { + if (!ContainsAttribute(prop, collector.AttributeType, false)) + continue; + + if (!prop.CanWrite) + throw new IgniteException("Property with resource attribute is not writable [" + + "type=" + type.Name + + ", property=" + prop.Name + + ", resourceType=" + collector.ResourceType.Name + ']'); + + if (!prop.PropertyType.IsAssignableFrom(collector.ResourceType)) + throw new IgniteException("Invalid property type for resource attribute [" + + "type=" + type.Name + + ", property=" + prop.Name + + ", propertyType=" + prop.PropertyType.Name + + ", resourceType=" + collector.ResourceType.Name + ']'); + + collector.Add(new ResourcePropertyInjector(prop)); + } + } + + MethodInfo[] mthds = type.GetMethods(Flags); + + foreach (MethodInfo mthd in mthds) + { + foreach (var collector in collectors) + { + if (!ContainsAttribute(mthd, collector.AttributeType, false)) + continue; + + ParameterInfo[] parameters = mthd.GetParameters(); + + if (parameters.Length != 1) + throw new IgniteException("Method with resource attribute must have only one parameter [" + + "type=" + type.Name + + ", method=" + mthd.Name + + ", resourceType=" + collector.ResourceType.Name + ']'); + + if (!parameters[0].ParameterType.IsAssignableFrom(collector.ResourceType)) + throw new IgniteException("Invalid method parameter type for resource attribute [" + + "type=" + type.Name + + ", method=" + mthd.Name + + ", methodParameterType=" + parameters[0].ParameterType.Name + + ", resourceType=" + collector.ResourceType.Name + ']'); + + collector.Add(new ResourceMethodInjector(mthd)); + } + } + } + + /// <summary> + /// Check whether the given member contains the given attribute. + /// </summary> + /// <param name="member">Mmeber.</param> + /// <param name="attrType">Attribute type.</param> + /// <param name="inherit">Inherit flag.</param> + /// <returns>True if contains</returns> + private static bool ContainsAttribute(MemberInfo member, Type attrType, bool inherit) + { + return member.GetCustomAttributes(attrType, inherit).Length > 0; + } + + /// <summary> + /// Collector. + /// </summary> + private class Collector + { + /** Attribute type. */ + private readonly Type _attrType; + + /** Resource type. */ + private readonly Type _resType; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="attrType">Atrribute type.</param> + /// <param name="resType">Resource type.</param> + public Collector(Type attrType, Type resType) + { + _attrType = attrType; + _resType = resType; + } + + /// <summary> + /// Attribute type. + /// </summary> + public Type AttributeType + { + get { return _attrType; } + } + + /// <summary> + /// Resource type. + /// </summary> + public Type ResourceType + { + get { return _resType; } + } + + /// <summary> + /// Add injector. + /// </summary> + /// <param name="injector">Injector.</param> + public void Add(IResourceInjector injector) + { + if (Injectors == null) + Injectors = new List<IResourceInjector> { injector }; + else + Injectors.Add(injector); + } + + /// <summary> + /// Injectors. + /// </summary> + public List<IResourceInjector> Injectors { get; private set; } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceContext.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceContext.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceContext.cs new file mode 100644 index 0000000..f5674f3 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceContext.cs @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Diagnostics; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Services; + + /// <summary> + /// Service context. + /// </summary> + internal class ServiceContext : IServiceContext + { + /// <summary> + /// Initializes a new instance of the <see cref="ServiceContext"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public ServiceContext(IPortableRawReader reader) + { + Debug.Assert(reader != null); + + Name = reader.ReadString(); + ExecutionId = reader.ReadGuid() ?? Guid.Empty; + IsCancelled = reader.ReadBoolean(); + CacheName = reader.ReadString(); + AffinityKey = reader.ReadObject<object>(); + } + + /** <inheritdoc /> */ + public string Name { get; private set; } + + /** <inheritdoc /> */ + public Guid ExecutionId { get; private set; } + + /** <inheritdoc /> */ + public bool IsCancelled { get; private set; } + + /** <inheritdoc /> */ + public string CacheName { get; private set; } + + /** <inheritdoc /> */ + public object AffinityKey { get; private set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs new file mode 100644 index 0000000..9bd9814 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceDescriptor.cs @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Impl.Collections; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Services; + + /// <summary> + /// Service descriptor. + /// </summary> + internal class ServiceDescriptor : IServiceDescriptor + { + /** Services. */ + private readonly IServices _services; + + /** Service type. */ + private Type _type; + + /// <summary> + /// Initializes a new instance of the <see cref="ServiceDescriptor" /> class. + /// </summary> + /// <param name="name">Name.</param> + /// <param name="reader">Reader.</param> + /// <param name="services">Services.</param> + public ServiceDescriptor(string name, PortableReaderImpl reader, IServices services) + { + Debug.Assert(reader != null); + Debug.Assert(services != null); + Debug.Assert(!string.IsNullOrEmpty(name)); + + _services = services; + Name = name; + + CacheName = reader.ReadString(); + MaxPerNodeCount = reader.ReadInt(); + TotalCount = reader.ReadInt(); + OriginNodeId = reader.ReadGuid() ?? Guid.Empty; + AffinityKey = reader.ReadObject<object>(); + + var mapSize = reader.ReadInt(); + var snap = new Dictionary<Guid, int>(mapSize); + + for (var i = 0; i < mapSize; i++) + snap[reader.ReadGuid() ?? Guid.Empty] = reader.ReadInt(); + + TopologySnapshot = snap.AsReadOnly(); + } + + /** <inheritdoc /> */ + public string Name { get; private set; } + + /** <inheritdoc /> */ + public Type Type + { + get + { + try + { + return _type ?? (_type = _services.GetServiceProxy<IService>(Name).GetType()); + } + catch (Exception ex) + { + throw new ServiceInvocationException( + "Failed to retrieve service type. It has either been cancelled, or is not a .Net service", ex); + } + } + } + + /** <inheritdoc /> */ + public int TotalCount { get; private set; } + + /** <inheritdoc /> */ + public int MaxPerNodeCount { get; private set; } + + /** <inheritdoc /> */ + public string CacheName { get; private set; } + + /** <inheritdoc /> */ + public object AffinityKey { get; private set; } + + /** <inheritdoc /> */ + public Guid OriginNodeId { get; private set; } + + /** <inheritdoc /> */ + public IDictionary<Guid, int> TopologySnapshot { get; private set; } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxy.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxy.cs new file mode 100644 index 0000000..ebb4c84 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxy.cs @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Diagnostics; + using System.Reflection; + using System.Runtime.Remoting.Messaging; + using System.Runtime.Remoting.Proxies; + + /// <summary> + /// Service proxy: user works with a remote service as if it is a local object. + /// </summary> + /// <typeparam name="T">User type to be proxied.</typeparam> + internal class ServiceProxy<T> : RealProxy + { + /** Services. */ + private readonly Func<MethodBase, object[], object> _invokeAction; + + /// <summary> + /// Initializes a new instance of the <see cref="ServiceProxy{T}" /> class. + /// </summary> + /// <param name="invokeAction">Method invoke action.</param> + public ServiceProxy(Func<MethodBase, object[], object> invokeAction) + : base(typeof (T)) + { + Debug.Assert(invokeAction != null); + + _invokeAction = invokeAction; + } + + /** <inheritdoc /> */ + public override IMessage Invoke(IMessage msg) + { + var methodCall = msg as IMethodCallMessage; + + if (methodCall == null) + throw new NotSupportedException("Service proxy operation type not supported: " + msg.GetType() + + ". Only method and property calls are supported."); + + if (methodCall.InArgCount != methodCall.ArgCount) + throw new NotSupportedException("Service proxy does not support out arguments: " + + methodCall.MethodBase); + + var result = _invokeAction(methodCall.MethodBase, methodCall.Args); + + return new ReturnMessage(result, null, 0, methodCall.LogicalCallContext, methodCall); + } + + /** <inheritdoc /> */ + public new T GetTransparentProxy() + { + return (T) base.GetTransparentProxy(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs new file mode 100644 index 0000000..fa5da17 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxyInvoker.cs @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Reflection; + + /// <summary> + /// Invokes service proxy methods. + /// </summary> + internal static class ServiceProxyInvoker + { + /// <summary> + /// Invokes the service method according to data from a stream, + /// and writes invocation result to the output stream. + /// </summary> + /// <param name="svc">Service instance.</param> + /// <param name="methodName">Name of the method.</param> + /// <param name="arguments">Arguments.</param> + /// <returns>Pair of method return value and invocation exception.</returns> + public static KeyValuePair<object, Exception> InvokeServiceMethod(object svc, string methodName, + object[] arguments) + { + Debug.Assert(svc != null); + Debug.Assert(!string.IsNullOrWhiteSpace(methodName)); + + var method = GetMethodOrThrow(svc.GetType(), methodName, arguments); + + try + { + return new KeyValuePair<object, Exception>(method.Invoke(svc, arguments), null); + } + catch (TargetInvocationException invokeErr) + { + return new KeyValuePair<object, Exception>(null, invokeErr.InnerException); + } + catch (Exception err) + { + return new KeyValuePair<object, Exception>(null, err); + } + } + + /// <summary> + /// Finds suitable method in the specified type, or throws an exception. + /// </summary> + private static MethodBase GetMethodOrThrow(Type svcType, string methodName, object[] arguments) + { + Debug.Assert(svcType != null); + Debug.Assert(!string.IsNullOrWhiteSpace(methodName)); + + // 1) Find methods by name + var methods = svcType.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) + .Where(m => CleanupMethodName(m) == methodName).ToArray(); + + if (methods.Length == 1) + return methods[0]; + + if (methods.Length == 0) + throw new InvalidOperationException( + string.Format("Failed to invoke proxy: there is no method '{0}' in type '{1}'", + methodName, svcType)); + + // 2) There is more than 1 method with specified name - resolve with argument types. + methods = methods.Where(m => AreMethodArgsCompatible(arguments, m.GetParameters())).ToArray(); + + if (methods.Length == 1) + return methods[0]; + + // 3) 0 or more than 1 matching method - throw. + var argsString = arguments == null || arguments.Length == 0 + ? "0" + : "(" + + arguments.Select(x => x == null ? "null" : x.GetType().Name).Aggregate((x, y) => x + ", " + y) + + ")"; + + if (methods.Length == 0) + throw new InvalidOperationException( + string.Format("Failed to invoke proxy: there is no method '{0}' in type '{1}' with {2} arguments", + methodName, svcType, argsString)); + + throw new InvalidOperationException( + string.Format("Failed to invoke proxy: there are {2} methods '{0}' in type '{1}' with {3} " + + "arguments, can't resolve ambiguity.", methodName, svcType, methods.Length, argsString)); + } + + /// <summary> + /// Cleans up a method name by removing interface part, + /// which occurs when explicit interface implementation is used. + /// </summary> + private static string CleanupMethodName(MethodBase method) + { + var name = method.Name; + + var dotIdx = name.LastIndexOf(Type.Delimiter); + + return dotIdx < 0 ? name : name.Substring(dotIdx + 1); + } + + /// <summary> + /// Determines whether specified method arguments are comatible with given method parameter definitions. + /// </summary> + /// <param name="methodArgs">Method argument types.</param> + /// <param name="targetParameters">Target method parameter definitions.</param> + /// <returns>True if a target method can be called with specified set of arguments; otherwise, false.</returns> + private static bool AreMethodArgsCompatible(object[] methodArgs, ParameterInfo[] targetParameters) + { + if (methodArgs == null || methodArgs.Length == 0) + return targetParameters.Length == 0; + + if (methodArgs.Length != targetParameters.Length) + return false; + + return methodArgs + .Zip(targetParameters, (arg, param) => arg == null || param.ParameterType.IsInstanceOfType(arg)) + .All(x => x); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs new file mode 100644 index 0000000..e7af8da --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Diagnostics; + using System.Reflection; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Services; + + /// <summary> + /// Static proxy methods. + /// </summary> + internal static class ServiceProxySerializer + { + /// <summary> + /// Writes proxy method invocation data to the specified writer. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="method">Method.</param> + /// <param name="arguments">Arguments.</param> + public static void WriteProxyMethod(PortableWriterImpl writer, MethodBase method, object[] arguments) + { + Debug.Assert(writer != null); + Debug.Assert(method != null); + + writer.WriteString(method.Name); + + if (arguments != null) + { + writer.WriteBoolean(true); + writer.WriteInt(arguments.Length); + + foreach (var arg in arguments) + writer.WriteObject(arg); + } + else + writer.WriteBoolean(false); + } + + /// <summary> + /// Reads proxy method invocation data from the specified reader. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="mthdName">Method name.</param> + /// <param name="mthdArgs">Method arguments.</param> + public static void ReadProxyMethod(IPortableStream stream, PortableMarshaller marsh, + out string mthdName, out object[] mthdArgs) + { + var reader = marsh.StartUnmarshal(stream); + + var srvKeepPortable = reader.ReadBoolean(); + + mthdName = reader.ReadString(); + + if (reader.ReadBoolean()) + { + mthdArgs = new object[reader.ReadInt()]; + + if (srvKeepPortable) + reader = marsh.StartUnmarshal(stream, true); + + for (var i = 0; i < mthdArgs.Length; i++) + mthdArgs[i] = reader.ReadObject<object>(); + } + else + mthdArgs = null; + } + + /// <summary> + /// Writes method invocation result. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="methodResult">Method result.</param> + /// <param name="invocationError">Method invocation error.</param> + public static void WriteInvocationResult(IPortableStream stream, PortableMarshaller marsh, object methodResult, + Exception invocationError) + { + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + var writer = marsh.StartMarshal(stream); + + PortableUtils.WriteInvocationResult(writer, invocationError == null, invocationError ?? methodResult); + } + + /// <summary> + /// Reads method invocation result. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="keepPortable">Portable flag.</param> + /// <returns> + /// Method invocation result, or exception in case of error. + /// </returns> + public static object ReadInvocationResult(IPortableStream stream, PortableMarshaller marsh, bool keepPortable) + { + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + var mode = keepPortable ? PortableMode.ForcePortable : PortableMode.Deserialize; + + var reader = marsh.StartUnmarshal(stream, mode); + + object err; + + var res = PortableUtils.ReadInvocationResult(reader, out err); + + if (err == null) + return res; + + var portErr = err as IPortableObject; + + throw portErr != null + ? new ServiceInvocationException("Proxy method invocation failed with a portable error. " + + "Examine PortableCause for details.", portErr) + : new ServiceInvocationException("Proxy method invocation failed with an exception. " + + "Examine InnerException for details.", (Exception) err); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs new file mode 100644 index 0000000..38a7175 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Reflection; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Services; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Services implementation. + /// </summary> + internal class Services : PlatformTarget, IServices + { + /** */ + private const int OpDeploy = 1; + + /** */ + private const int OpDeployMultiple = 2; + + /** */ + private const int OpDotnetServices = 3; + + /** */ + private const int OpInvokeMethod = 4; + + /** */ + private const int OpDescriptors = 5; + + /** */ + private readonly IClusterGroup _clusterGroup; + + /** Invoker portable flag. */ + protected readonly bool KeepPortable; + + /** Server portable flag. */ + protected readonly bool SrvKeepPortable; + + /// <summary> + /// Initializes a new instance of the <see cref="Services" /> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="clusterGroup">Cluster group.</param> + /// <param name="keepPortable">Invoker portable flag.</param> + /// <param name="srvKeepPortable">Server portable flag.</param> + public Services(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup, + bool keepPortable, bool srvKeepPortable) + : base(target, marsh) + { + Debug.Assert(clusterGroup != null); + + _clusterGroup = clusterGroup; + KeepPortable = keepPortable; + SrvKeepPortable = srvKeepPortable; + } + + /** <inheritDoc /> */ + public virtual IServices WithKeepPortable() + { + if (KeepPortable) + return this; + + return new Services(Target, Marshaller, _clusterGroup, true, SrvKeepPortable); + } + + /** <inheritDoc /> */ + public virtual IServices WithServerKeepPortable() + { + if (SrvKeepPortable) + return this; + + return new Services(UU.ServicesWithServerKeepPortable(Target), Marshaller, _clusterGroup, KeepPortable, true); + } + + /** <inheritDoc /> */ + public virtual IServices WithAsync() + { + return new ServicesAsync(UU.ServicesWithAsync(Target), Marshaller, _clusterGroup, KeepPortable, SrvKeepPortable); + } + + /** <inheritDoc /> */ + public virtual bool IsAsync + { + get { return false; } + } + + /** <inheritDoc /> */ + public virtual IFuture GetFuture() + { + throw new InvalidOperationException("Asynchronous mode is disabled"); + } + + /** <inheritDoc /> */ + public virtual IFuture<TResult> GetFuture<TResult>() + { + throw new InvalidOperationException("Asynchronous mode is disabled"); + } + + /** <inheritDoc /> */ + public IClusterGroup ClusterGroup + { + get { return _clusterGroup; } + } + + /** <inheritDoc /> */ + public void DeployClusterSingleton(string name, IService service) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); + + DeployMultiple(name, service, 1, 1); + } + + /** <inheritDoc /> */ + public void DeployNodeSingleton(string name, IService service) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); + + DeployMultiple(name, service, 0, 1); + } + + /** <inheritDoc /> */ + public void DeployKeyAffinitySingleton<TK>(string name, IService service, string cacheName, TK affinityKey) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); + IgniteArgumentCheck.NotNull(affinityKey, "affinityKey"); + + Deploy(new ServiceConfiguration + { + Name = name, + Service = service, + CacheName = cacheName, + AffinityKey = affinityKey, + TotalCount = 1, + MaxPerNodeCount = 1 + }); + } + + /** <inheritDoc /> */ + public void DeployMultiple(string name, IService service, int totalCount, int maxPerNodeCount) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.NotNull(service, "service"); + + DoOutOp(OpDeployMultiple, w => + { + w.WriteString(name); + w.WriteObject(service); + w.WriteInt(totalCount); + w.WriteInt(maxPerNodeCount); + }); + } + + /** <inheritDoc /> */ + public void Deploy(ServiceConfiguration configuration) + { + IgniteArgumentCheck.NotNull(configuration, "configuration"); + + DoOutOp(OpDeploy, w => + { + w.WriteString(configuration.Name); + w.WriteObject(configuration.Service); + w.WriteInt(configuration.TotalCount); + w.WriteInt(configuration.MaxPerNodeCount); + w.WriteString(configuration.CacheName); + w.WriteObject(configuration.AffinityKey); + + if (configuration.NodeFilter != null) + w.WriteObject(new PortableOrSerializableObjectHolder(configuration.NodeFilter)); + else + w.WriteObject<PortableOrSerializableObjectHolder>(null); + }); + } + + /** <inheritDoc /> */ + public void Cancel(string name) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + UU.ServicesCancel(Target, name); + } + + /** <inheritDoc /> */ + public void CancelAll() + { + UU.ServicesCancelAll(Target); + } + + /** <inheritDoc /> */ + public ICollection<IServiceDescriptor> GetServiceDescriptors() + { + return DoInOp(OpDescriptors, stream => + { + var reader = Marshaller.StartUnmarshal(stream, KeepPortable); + + var size = reader.ReadInt(); + + var result = new List<IServiceDescriptor>(size); + + for (var i = 0; i < size; i++) + { + var name = reader.ReadString(); + + result.Add(new ServiceDescriptor(name, reader, this)); + } + + return result; + }); + } + + /** <inheritDoc /> */ + public T GetService<T>(string name) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + var services = GetServices<T>(name); + + if (services == null) + return default(T); + + return services.FirstOrDefault(); + } + + /** <inheritDoc /> */ + public ICollection<T> GetServices<T>(string name) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + return DoOutInOp<ICollection<T>>(OpDotnetServices, w => w.WriteString(name), + r => + { + bool hasVal = r.ReadBool(); + + if (hasVal) + { + var count = r.ReadInt(); + + var res = new List<T>(count); + + for (var i = 0; i < count; i++) + res.Add((T)Marshaller.Ignite.HandleRegistry.Get<IService>(r.ReadLong())); + + return res; + } + return null; + }); + } + + /** <inheritDoc /> */ + public T GetServiceProxy<T>(string name) where T : class + { + return GetServiceProxy<T>(name, false); + } + + /** <inheritDoc /> */ + public T GetServiceProxy<T>(string name, bool sticky) where T : class + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + IgniteArgumentCheck.Ensure(typeof(T).IsInterface, "T", "Service proxy type should be an interface: " + typeof(T)); + + // In local scenario try to return service instance itself instead of a proxy + // Get as object because proxy interface may be different from real interface + var locInst = GetService<object>(name) as T; + + if (locInst != null) + return locInst; + + var javaProxy = UU.ServicesGetServiceProxy(Target, name, sticky); + + return new ServiceProxy<T>((method, args) => InvokeProxyMethod(javaProxy, method, args)) + .GetTransparentProxy(); + } + + /// <summary> + /// Invokes the service proxy method. + /// </summary> + /// <param name="proxy">Unmanaged proxy.</param> + /// <param name="method">Method to invoke.</param> + /// <param name="args">Arguments.</param> + /// <returns> + /// Invocation result. + /// </returns> + private unsafe object InvokeProxyMethod(IUnmanagedTarget proxy, MethodBase method, object[] args) + { + return DoOutInOp(OpInvokeMethod, + writer => ServiceProxySerializer.WriteProxyMethod(writer, method, args), + stream => ServiceProxySerializer.ReadInvocationResult(stream, Marshaller, KeepPortable), proxy.Target); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServicesAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServicesAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServicesAsync.cs new file mode 100644 index 0000000..860de45 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Services/ServicesAsync.cs @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Services +{ + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Services; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Async services implementation. + /// </summary> + internal class ServicesAsync : Services + { + /// <summary> + /// Initializes a new instance of the <see cref="ServicesAsync" /> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="clusterGroup">Cluster group.</param> + /// <param name="keepPortable">Portable flag.</param> + /// <param name="srvKeepPortable">Server portable flag.</param> + public ServicesAsync(IUnmanagedTarget target, PortableMarshaller marsh, IClusterGroup clusterGroup, + bool keepPortable, bool srvKeepPortable) + : base(target, marsh, clusterGroup, keepPortable, srvKeepPortable) + { + // No-op + } + + /** <inheritDoc /> */ + public override bool IsAsync + { + get { return true; } + } + + /** <inheritDoc /> */ + public override IServices WithKeepPortable() + { + if (KeepPortable) + return this; + + return new ServicesAsync(Target, Marshaller, ClusterGroup, true, SrvKeepPortable); + } + + /** <inheritDoc /> */ + public override IServices WithServerKeepPortable() + { + if (SrvKeepPortable) + return this; + + return new ServicesAsync(Target, Marshaller, ClusterGroup, KeepPortable, true); + } + + /** <inheritDoc /> */ + public override IServices WithAsync() + { + return this; + } + + /** <inheritDoc /> */ + public override IFuture GetFuture() + { + return GetFuture<object>(); + } + + /** <inheritDoc /> */ + public override IFuture<T> GetFuture<T>() + { + return GetFuture<T>((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/AsyncTransaction.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/AsyncTransaction.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/AsyncTransaction.cs new file mode 100644 index 0000000..82d1d55 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/AsyncTransaction.cs @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using System.Threading; + using Apache.Ignite.Core.Common; + + /// <summary> + /// Grid async transaction facade. + /// </summary> + internal class AsyncTransaction : Transaction + { + /** */ + private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>(); + + /// <summary> + /// Initializes a new instance of the <see cref="AsyncTransaction"/> class. + /// </summary> + /// <param name="tx">The tx to wrap.</param> + public AsyncTransaction(TransactionImpl tx) : base(tx) + { + // No-op. + } + + /** <inheritDoc /> */ + public override bool IsAsync + { + get { return true; } + } + + /** <inheritDoc /> */ + public override IFuture<TResult> GetFuture<TResult>() + { + return GetFuture() as IFuture<TResult>; + } + + /** <inheritDoc /> */ + public override IFuture GetFuture() + { + var fut = _curFut.Value; + + if (fut == null) + throw new InvalidOperationException("Asynchronous operation not started."); + + _curFut.Value = null; + + return fut; + } + + /** <inheritDoc /> */ + public override void Commit() + { + _curFut.Value = Tx.GetFutureOrError(() => Tx.CommitAsync()); + } + + /** <inheritDoc /> */ + public override void Rollback() + { + _curFut.Value = Tx.GetFutureOrError(() => Tx.RollbackAsync()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs new file mode 100644 index 0000000..47c9f93 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Ignite transaction facade. + /// </summary> + internal class Transaction : ITransaction + { + /** */ + protected readonly TransactionImpl Tx; + + /// <summary> + /// Initializes a new instance of the <see cref="Transaction" /> class. + /// </summary> + /// <param name="tx">The tx to wrap.</param> + public Transaction(TransactionImpl tx) + { + Tx = tx; + } + + /** <inheritDoc /> */ + public void Dispose() + { + Tx.Dispose(); + } + + /** <inheritDoc /> */ + public ITransaction WithAsync() + { + return new AsyncTransaction(Tx); + } + + /** <inheritDoc /> */ + public virtual bool IsAsync + { + get { return false; } + } + + /** <inheritDoc /> */ + public virtual IFuture GetFuture() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public virtual IFuture<TResult> GetFuture<TResult>() + { + throw IgniteUtils.GetAsyncModeDisabledException(); + } + + /** <inheritDoc /> */ + public Guid NodeId + { + get { return Tx.NodeId; } + } + + /** <inheritDoc /> */ + public long ThreadId + { + get { return Tx.ThreadId; } + } + + /** <inheritDoc /> */ + public DateTime StartTime + { + get { return Tx.StartTime; } + } + + /** <inheritDoc /> */ + public TransactionIsolation Isolation + { + get { return Tx.Isolation; } + } + + /** <inheritDoc /> */ + public TransactionConcurrency Concurrency + { + get { return Tx.Concurrency; } + } + + /** <inheritDoc /> */ + public TransactionState State + { + get { return Tx.State; } + } + + /** <inheritDoc /> */ + public TimeSpan Timeout + { + get { return Tx.Timeout; } + } + + /** <inheritDoc /> */ + public bool IsRollbackOnly + { + get { return Tx.IsRollbackOnly; } + } + + /** <inheritDoc /> */ + public bool SetRollbackonly() + { + return Tx.SetRollbackOnly(); + } + + /** <inheritDoc /> */ + public virtual void Commit() + { + Tx.Commit(); + } + + /** <inheritDoc /> */ + public virtual void Rollback() + { + Tx.Rollback(); + } + + /** <inheritDoc /> */ + public void AddMeta<TV>(string name, TV val) + { + Tx.AddMeta(name, val); + } + + /** <inheritDoc /> */ + public TV Meta<TV>(string name) + { + return Tx.Meta<TV>(name); + } + + /** <inheritDoc /> */ + public TV RemoveMeta<TV>(string name) + { + return Tx.RemoveMeta<TV>(name); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs new file mode 100644 index 0000000..9e71181 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using System.Threading; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Grid cache transaction implementation. + /// </summary> + internal sealed class TransactionImpl + { + /** Metadatas. */ + private object[] _metas; + + /** Unique transaction ID.*/ + private readonly long _id; + + /** Cache. */ + private readonly TransactionsImpl _txs; + + /** TX concurrency. */ + private readonly TransactionConcurrency _concurrency; + + /** TX isolation. */ + private readonly TransactionIsolation _isolation; + + /** Timeout. */ + private readonly TimeSpan _timeout; + + /** Start time. */ + private readonly DateTime _startTime; + + /** Owning thread ID. */ + private readonly int _threadId; + + /** Originating node ID. */ + private readonly Guid _nodeId; + + /** State holder. */ + private StateHolder _state; + + // ReSharper disable once InconsistentNaming + /** Transaction for this thread. */ + [ThreadStatic] + private static TransactionImpl THREAD_TX; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="id">ID.</param> + /// <param name="txs">Transactions.</param> + /// <param name="concurrency">TX concurrency.</param> + /// <param name="isolation">TX isolation.</param> + /// <param name="timeout">Timeout.</param> + /// <param name="nodeId">The originating node identifier.</param> + public TransactionImpl(long id, TransactionsImpl txs, TransactionConcurrency concurrency, + TransactionIsolation isolation, TimeSpan timeout, Guid nodeId) { + _id = id; + _txs = txs; + _concurrency = concurrency; + _isolation = isolation; + _timeout = timeout; + _nodeId = nodeId; + + _startTime = DateTime.Now; + + _threadId = Thread.CurrentThread.ManagedThreadId; + + THREAD_TX = this; + } + + /// <summary> + /// Transaction assigned to this thread. + /// </summary> + public static Transaction Current + { + get + { + var tx = THREAD_TX; + + if (tx == null) + return null; + + if (tx.IsClosed) + { + THREAD_TX = null; + + return null; + } + + return new Transaction(tx); + } + } + + /// <summary> + /// Commits this tx and closes it. + /// </summary> + public void Commit() + { + lock (this) + { + ThrowIfClosed(); + + _state = new StateHolder(_txs.TxCommit(this)); + } + } + + /// <summary> + /// Rolls this tx back and closes it. + /// </summary> + public void Rollback() + { + lock (this) + { + ThrowIfClosed(); + + _state = new StateHolder(_txs.TxRollback(this)); + } + } + + /// <summary> + /// Sets the rollback only flag. + /// </summary> + public bool SetRollbackOnly() + { + lock (this) + { + ThrowIfClosed(); + + return _txs.TxSetRollbackOnly(this); + } + } + + /// <summary> + /// Gets a value indicating whether this instance is rollback only. + /// </summary> + public bool IsRollbackOnly + { + get + { + lock (this) + { + var state0 = _state == null ? State : _state.State; + + return state0 == TransactionState.MarkedRollback || + state0 == TransactionState.RollingBack || + state0 == TransactionState.RolledBack; + } + } + } + + /// <summary> + /// Gets the state. + /// </summary> + public TransactionState State + { + get + { + lock (this) + { + return _state != null ? _state.State : _txs.TxState(this); + } + } + } + + /// <summary> + /// Gets the isolation. + /// </summary> + public TransactionIsolation Isolation + { + get { return _isolation; } + } + + /// <summary> + /// Gets the concurrency. + /// </summary> + public TransactionConcurrency Concurrency + { + get { return _concurrency; } + } + + /// <summary> + /// Gets the timeout. + /// </summary> + public TimeSpan Timeout + { + get { return _timeout; } + } + + /// <summary> + /// Gets the start time. + /// </summary> + public DateTime StartTime + { + get { return _startTime; } + } + + + /// <summary> + /// Gets the node identifier. + /// </summary> + public Guid NodeId + { + get { return _nodeId; } + } + + /// <summary> + /// Gets the thread identifier. + /// </summary> + public long ThreadId + { + get { return _threadId; } + } + + /// <summary> + /// Adds a new metadata. + /// </summary> + public void AddMeta<TV>(string name, TV val) + { + if (name == null) + throw new ArgumentException("Meta name cannot be null."); + + lock (this) + { + if (_metas != null) + { + int putIdx = -1; + + for (int i = 0; i < _metas.Length; i += 2) + { + if (name.Equals(_metas[i])) + { + _metas[i + 1] = val; + + return; + } + if (_metas[i] == null && putIdx == -1) + // Preserve empty space index. + putIdx = i; + } + + // No meta with the given name found. + if (putIdx == -1) + { + // Extend array. + putIdx = _metas.Length; + + object[] metas0 = new object[putIdx + 2]; + + Array.Copy(_metas, metas0, putIdx); + + _metas = metas0; + } + + _metas[putIdx] = name; + _metas[putIdx + 1] = val; + } + else + _metas = new object[] { name, val }; + } + } + + /// <summary> + /// Gets metadata by name. + /// </summary> + public TV Meta<TV>(string name) + { + if (name == null) + throw new ArgumentException("Meta name cannot be null."); + + lock (this) + { + if (_metas != null) + { + for (int i = 0; i < _metas.Length; i += 2) + { + if (name.Equals(_metas[i])) + return (TV)_metas[i + 1]; + } + } + + return default(TV); + } + } + + /// <summary> + /// Removes metadata by name. + /// </summary> + public TV RemoveMeta<TV>(string name) + { + if (name == null) + throw new ArgumentException("Meta name cannot be null."); + + lock (this) + { + if (_metas != null) + { + for (int i = 0; i < _metas.Length; i += 2) + { + if (name.Equals(_metas[i])) + { + TV val = (TV)_metas[i + 1]; + + _metas[i] = null; + _metas[i + 1] = null; + + return val; + } + } + } + + return default(TV); + } + } + + /// <summary> + /// Commits tx in async mode. + /// </summary> + internal IFuture CommitAsync() + { + lock (this) + { + ThrowIfClosed(); + + var fut = _txs.CommitAsync(this); + + CloseWhenComplete(fut); + + return fut; + } + } + + /// <summary> + /// Rolls tx back in async mode. + /// </summary> + internal IFuture RollbackAsync() + { + lock (this) + { + ThrowIfClosed(); + + var fut = _txs.RollbackAsync(this); + + CloseWhenComplete(fut); + + return fut; + } + } + + /// <summary> + /// Transaction ID. + /// </summary> + internal long Id + { + get { return _id; } + } + + /** <inheritdoc /> */ + public void Dispose() + { + try + { + Close(); + } + finally + { + GC.SuppressFinalize(this); + } + } + + /// <summary> + /// Gets a value indicating whether this transaction is closed. + /// </summary> + internal bool IsClosed + { + get { return _state != null; } + } + + /// <summary> + /// Gets the closed exception. + /// </summary> + private InvalidOperationException GetClosedException() + { + return new InvalidOperationException(string.Format("Transaction {0} is closed, state is {1}", Id, State)); + } + + /// <summary> + /// Creates a future via provided factory if IsClosed is false; otherwise, return a future with an error. + /// </summary> + internal IFuture GetFutureOrError(Func<IFuture> operationFactory) + { + lock (this) + { + return IsClosed ? GetExceptionFuture() : operationFactory(); + } + } + + /// <summary> + /// Gets the future that throws an exception. + /// </summary> + private IFuture GetExceptionFuture() + { + var fut = new Future<object>(); + + fut.OnError(GetClosedException()); + + return fut; + } + + /// <summary> + /// Closes the transaction and releases unmanaged resources. + /// </summary> + private void Close() + { + lock (this) + { + _state = _state ?? new StateHolder((TransactionState) _txs.TxClose(this)); + } + } + + /// <summary> + /// Throws and exception if transaction is closed. + /// </summary> + private void ThrowIfClosed() + { + if (IsClosed) + throw GetClosedException(); + } + + /// <summary> + /// Closes this transaction upon future completion. + /// </summary> + private void CloseWhenComplete(IFuture fut) + { + fut.Listen(Close); + } + + /** <inheritdoc /> */ + ~TransactionImpl() + { + Dispose(); + } + + /// <summary> + /// State holder. + /// </summary> + private class StateHolder + { + /** Current state. */ + private readonly TransactionState _state; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="state">State.</param> + public StateHolder(TransactionState state) + { + _state = state; + } + + /// <summary> + /// Current state. + /// </summary> + public TransactionState State + { + get { return _state; } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionMetricsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionMetricsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionMetricsImpl.cs new file mode 100644 index 0000000..e2528f4 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionMetricsImpl.cs @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Transactions; + + /// <summary> + /// Transaction metrics. + /// </summary> + internal class TransactionMetricsImpl : ITransactionMetrics + { + /// <summary> + /// Initializes a new instance of the <see cref="TransactionMetricsImpl"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + public TransactionMetricsImpl(IPortableRawReader reader) + { + CommitTime = reader.ReadDate() ?? default(DateTime); + RollbackTime = reader.ReadDate() ?? default(DateTime); + + TxCommits = reader.ReadInt(); + TxRollbacks = reader.ReadInt(); + } + + /// <summary> + /// Gets the last time transaction was committed. + /// </summary> + public DateTime CommitTime { get; private set; } + + /// <summary> + /// Gets the last time transaction was rolled back. + /// </summary> + public DateTime RollbackTime { get; private set; } + + /// <summary> + /// Gets the total number of transaction commits. + /// </summary> + public int TxCommits { get; private set; } + + /// <summary> + /// Gets the total number of transaction rollbacks. + /// </summary> + public int TxRollbacks { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs new file mode 100644 index 0000000..4eaa53f --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable; + using Apache.Ignite.Core.Impl.Unmanaged; + using Apache.Ignite.Core.Portable; + using Apache.Ignite.Core.Transactions; + using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; + + /// <summary> + /// Transactions facade. + /// </summary> + internal class TransactionsImpl : PlatformTarget, ITransactions + { + /** */ + private const int OpCacheConfigParameters = 1; + + /** */ + private const int OpMetrics = 2; + + /** */ + private readonly TransactionConcurrency _dfltConcurrency; + + /** */ + private readonly TransactionIsolation _dfltIsolation; + + /** */ + private readonly TimeSpan _dfltTimeout; + + /** */ + private readonly Guid _localNodeId; + + /// <summary> + /// Initializes a new instance of the <see cref="TransactionsImpl" /> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + /// <param name="localNodeId">Local node id.</param> + public TransactionsImpl(IUnmanagedTarget target, PortableMarshaller marsh, + Guid localNodeId) : base(target, marsh) + { + _localNodeId = localNodeId; + + TransactionConcurrency concurrency = default(TransactionConcurrency); + TransactionIsolation isolation = default(TransactionIsolation); + TimeSpan timeout = default(TimeSpan); + + DoInOp(OpCacheConfigParameters, stream => + { + var reader = marsh.StartUnmarshal(stream).RawReader(); + + concurrency = reader.ReadEnum<TransactionConcurrency>(); + isolation = reader.ReadEnum<TransactionIsolation>(); + timeout = TimeSpan.FromMilliseconds(reader.ReadLong()); + }); + + _dfltConcurrency = concurrency; + _dfltIsolation = isolation; + _dfltTimeout = timeout; + } + + /** <inheritDoc /> */ + public ITransaction TxStart() + { + return TxStart(_dfltConcurrency, _dfltIsolation); + } + + /** <inheritDoc /> */ + public ITransaction TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation) + { + return TxStart(concurrency, isolation, _dfltTimeout, 0); + } + + /** <inheritDoc /> */ + public ITransaction TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation, + TimeSpan timeout, int txSize) + { + var id = UU.TransactionsStart(Target, (int)concurrency, (int)isolation, (long)timeout.TotalMilliseconds, + txSize); + + var innerTx = new TransactionImpl(id, this, concurrency, isolation, timeout, _localNodeId); + + return new Transaction(innerTx); + } + + /** <inheritDoc /> */ + public ITransaction Tx + { + get { return TransactionImpl.Current; } + } + + /** <inheritDoc /> */ + public ITransactionMetrics GetMetrics() + { + return DoInOp(OpMetrics, stream => + { + IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false); + + return new TransactionMetricsImpl(reader); + }); + } + + /** <inheritDoc /> */ + public void ResetMetrics() + { + UU.TransactionsResetMetrics(Target); + } + + /// <summary> + /// Commit transaction. + /// </summary> + /// <param name="tx">Transaction.</param> + /// <returns>Final transaction state.</returns> + internal TransactionState TxCommit(TransactionImpl tx) + { + return (TransactionState) UU.TransactionsCommit(Target, tx.Id); + } + + /// <summary> + /// Rollback transaction. + /// </summary> + /// <param name="tx">Transaction.</param> + /// <returns>Final transaction state.</returns> + internal TransactionState TxRollback(TransactionImpl tx) + { + return (TransactionState)UU.TransactionsRollback(Target, tx.Id); + } + + /// <summary> + /// Close transaction. + /// </summary> + /// <param name="tx">Transaction.</param> + /// <returns>Final transaction state.</returns> + internal int TxClose(TransactionImpl tx) + { + return UU.TransactionsClose(Target, tx.Id); + } + + /// <summary> + /// Get transaction current state. + /// </summary> + /// <param name="tx">Transaction.</param> + /// <returns>Transaction current state.</returns> + internal TransactionState TxState(TransactionImpl tx) + { + return GetTransactionState(UU.TransactionsState(Target, tx.Id)); + } + + /// <summary> + /// Set transaction rollback-only flag. + /// </summary> + /// <param name="tx">Transaction.</param> + /// <returns><c>true</c> if the flag was set.</returns> + internal bool TxSetRollbackOnly(TransactionImpl tx) + { + return UU.TransactionsSetRollbackOnly(Target, tx.Id); + } + + /// <summary> + /// Commits tx in async mode. + /// </summary> + internal IFuture CommitAsync(TransactionImpl tx) + { + return GetFuture<object>((futId, futTyp) => UU.TransactionsCommitAsync(Target, tx.Id, futId)); + } + + /// <summary> + /// Rolls tx back in async mode. + /// </summary> + internal IFuture RollbackAsync(TransactionImpl tx) + { + return GetFuture<object>((futId, futTyp) => UU.TransactionsRollbackAsync(Target, tx.Id, futId)); + } + + /// <summary> + /// Gets the state of the transaction from int. + /// </summary> + private static TransactionState GetTransactionState(int state) + { + return (TransactionState)state; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IUnmanagedTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IUnmanagedTarget.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IUnmanagedTarget.cs new file mode 100644 index 0000000..235f20d --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IUnmanagedTarget.cs @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Unmanaged +{ + using System; + + /// <summary> + /// Unmanaged target. + /// </summary> + internal unsafe interface IUnmanagedTarget : IDisposable + { + /// <summary> + /// Context. + /// </summary> + void* Context { get; } + + /// <summary> + /// Target. + /// </summary> + void* Target { get; } + + /// <summary> + /// Creates new instance with same context and different target. + /// </summary> + IUnmanagedTarget ChangeTarget(void* target); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs new file mode 100644 index 0000000..07cf309 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Unmanaged +{ + using System.Runtime.InteropServices; + + /// <summary> + /// Unmanaged callback handler function pointers. + /// </summary> + [StructLayout(LayoutKind.Sequential)] + internal unsafe struct UnmanagedCallbackHandlers + { + internal void* target; + + internal void* cacheStoreCreate; + internal void* cacheStoreInvoke; + internal void* cacheStoreDestroy; + internal void* cacheStoreSessionCreate; + + internal void* cacheEntryFilterCreate; + internal void* cacheEntryFilterApply; + internal void* cacheEntryFilterDestroy; + + internal void* cacheInvoke; + + internal void* computeTaskMap; + internal void* computeTaskJobResult; + internal void* computeTaskReduce; + internal void* computeTaskComplete; + internal void* computeJobSerialize; + internal void* computeJobCreate; + internal void* computeJobExecute; + internal void* computeJobCancel; + internal void* computeJobDestroy; + + internal void* continuousQueryListenerApply; + internal void* continuousQueryFilterCreate; + internal void* continuousQueryFilterApply; + internal void* continuousQueryFilterRelease; + + internal void* dataStreamerTopologyUpdate; + internal void* dataStreamerStreamReceiverInvoke; + + internal void* futureByteResult; + internal void* futureBoolResult; + internal void* futureShortResult; + internal void* futureCharResult; + internal void* futureIntResult; + internal void* futureFloatResult; + internal void* futureLongResult; + internal void* futureDoubleResult; + internal void* futureObjectResult; + internal void* futureNullResult; + internal void* futureError; + + internal void* lifecycleOnEvent; + + internal void* memoryReallocate; + + internal void* messagingFilterCreate; + internal void* messagingFilterApply; + internal void* messagingFilterDestroy; + + internal void* eventFilterCreate; + internal void* eventFilterApply; + internal void* eventFilterDestroy; + + internal void* serviceInit; + internal void* serviceExecute; + internal void* serviceCancel; + internal void* serviceInvokeMethod; + + internal void* clusterNodeFilterApply; + + internal void* nodeInfo; + + internal void* onStart; + internal void* onStop; + internal void* error; + + internal void* extensionCbInLongOutLong; + internal void* extensionCbInLongLongOutLong; + } +}
