IGNITE-6272 .NET: Multiple services deployment This closes #2813
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d906b33 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d906b33 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d906b33 Branch: refs/heads/ignite-zk Commit: 1d906b330c28a1a4fbb7057a25d96bff3d7646a0 Parents: f50b235 Author: Alexey Popov <[email protected]> Authored: Fri Nov 24 19:27:09 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Fri Nov 24 19:27:09 2017 +0300 ---------------------------------------------------------------------- .../cluster/PlatformClusterNodeFilterImpl.java | 7 + .../dotnet/PlatformDotNetServiceImpl.java | 7 + .../platform/services/PlatformServices.java | 126 ++++++++++++- .../ignite/platform/PlatformExceptionTask.java | 8 + .../Apache.Ignite.Core.Tests/ExceptionsTest.cs | 2 + .../Services/ServicesAsyncWrapper.cs | 19 ++ .../Services/ServicesTest.cs | 181 ++++++++++++++++++- .../Impl/Binary/BinaryReaderExtensions.cs | 23 ++- .../Impl/Binary/BinaryUtils.cs | 6 +- .../Impl/Services/ServiceProxySerializer.cs | 20 +- .../Impl/Services/Services.cs | 96 +++++++--- .../Apache.Ignite.Core/Services/IServices.cs | 32 ++++ .../Services/ServiceConfiguration.cs | 70 ++++++- .../Services/ServiceDeploymentException.cs | 38 +++- 14 files changed, 589 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java index bebff8e..9d52c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java @@ -75,4 +75,11 @@ public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate imp public void setIgniteInstance(Ignite ignite) { ctx = PlatformUtils.platformContext(ignite); } + + /** + * @return Filter itself + */ + public Object getInternalPredicate() { + return pred; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java index 1eb9a2c..8730940 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java @@ -44,4 +44,11 @@ public class PlatformDotNetServiceImpl extends PlatformAbstractService implement public PlatformDotNetServiceImpl(Object svc, PlatformContext ctx, boolean srvKeepBinary) { super(svc, ctx, srvKeepBinary); } + + /** + * @return Service itself + */ + public Object getInternalService() { + return svc; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 6f8d9e5..ccb04d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformTarget; +import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilterImpl; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; @@ -37,6 +38,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceDeploymentException; import org.apache.ignite.services.ServiceDescriptor; import org.jetbrains.annotations.NotNull; @@ -96,6 +98,12 @@ public class PlatformServices extends PlatformAbstractTarget { private static final int OP_CANCEL_ALL_ASYNC = 14; /** */ + private static final int OP_DOTNET_DEPLOY_ALL = 15; + + /** */ + private static final int OP_DOTNET_DEPLOY_ALL_ASYNC = 16; + + /** */ private static final byte PLATFORM_JAVA = 0; /** */ @@ -178,6 +186,12 @@ public class PlatformServices extends PlatformAbstractTarget { return TRUE; } + case OP_DOTNET_DEPLOY_ALL_ASYNC: { + readAndListenFuture(reader, dotnetDeployAllAsync(reader, services), RESULT_WRITER); + + return TRUE; + } + default: return super.processInStreamOutLong(type, reader); } @@ -209,11 +223,10 @@ public class PlatformServices extends PlatformAbstractTarget { case OP_DOTNET_DEPLOY: { try { dotnetDeploy(reader, services); - - PlatformUtils.writeInvocationResult(writer, null, null); + writeDeploymentResult(writer, null); } catch (Exception e) { - PlatformUtils.writeInvocationResult(writer, null, e); + writeDeploymentResult(writer, e); } return; @@ -223,10 +236,23 @@ public class PlatformServices extends PlatformAbstractTarget { try { dotnetDeployMultiple(reader); - PlatformUtils.writeInvocationResult(writer, null, null); + writeDeploymentResult(writer, null); } catch (Exception e) { - PlatformUtils.writeInvocationResult(writer, null, e); + writeDeploymentResult(writer, e); + } + + return; + } + + case OP_DOTNET_DEPLOY_ALL: { + try { + dotnetDeployAll(reader, services); + + writeDeploymentResult(writer, null); + } + catch (Exception e) { + writeDeploymentResult(writer, e); } return; @@ -421,6 +447,31 @@ public class PlatformServices extends PlatformAbstractTarget { } /** + * Deploys a collection of dotnet services. + * + * @param reader Binary reader. + * @param services Services. + */ + private void dotnetDeployAll(BinaryRawReaderEx reader, IgniteServices services) { + Collection<ServiceConfiguration> cfgs = dotnetConfigurations(reader); + + services.deployAll(cfgs); + } + + /** + * Deploys a collection of dotnet services asynchronously. + * + * @param reader Binary reader. + * @param services Services. + * @return Future of the operation. + */ + private IgniteFuture<Void> dotnetDeployAllAsync(BinaryRawReaderEx reader, IgniteServices services) { + Collection<ServiceConfiguration> cfgs = dotnetConfigurations(reader); + + return services.deployAllAsync(cfgs); + } + + /** * Read the dotnet service configuration. * * @param reader Binary reader, @@ -445,6 +496,24 @@ public class PlatformServices extends PlatformAbstractTarget { } /** + * Reads the collection of dotnet service configurations. + * + * @param reader Binary reader, + * @return Service configuration. + */ + @NotNull private Collection<ServiceConfiguration> dotnetConfigurations(BinaryRawReaderEx reader) { + int numServices = reader.readInt(); + + List<ServiceConfiguration> cfgs = new ArrayList<>(numServices); + + for (int i = 0; i < numServices; i++) { + cfgs.add(dotnetConfiguration(reader)); + } + + return cfgs; + } + + /** * Proxy holder. */ @SuppressWarnings("unchecked") @@ -646,7 +715,7 @@ public class PlatformServices extends PlatformAbstractTarget { private static class ServiceDeploymentResultWriter implements PlatformFutureUtils.Writer { /** <inheritDoc /> */ @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { - PlatformUtils.writeInvocationResult(writer, obj, err); + writeDeploymentResult(writer, err); } /** <inheritDoc /> */ @@ -655,4 +724,49 @@ public class PlatformServices extends PlatformAbstractTarget { } } + /** + * Writes a service deployment result for dotnet code. + * + * @param writer Writer. + * @param err Error. + */ + private static void writeDeploymentResult(BinaryRawWriterEx writer, Throwable err) { + PlatformUtils.writeInvocationResult(writer, null, err); + + Collection<ServiceConfiguration> failedCfgs = null; + + if (err instanceof ServiceDeploymentException) + failedCfgs = ((ServiceDeploymentException)err).getFailedConfigurations(); + + // write a collection of failed service configurations + PlatformUtils.writeNullableCollection(writer, failedCfgs, new PlatformWriterClosure<ServiceConfiguration>() { + @Override public void write(BinaryRawWriterEx writer, ServiceConfiguration svcCfg) { + writeFailedConfiguration(writer, svcCfg); + } + }); + } + + /** + * Writes a failed service configuration for dotnet code. + * + * @param w Writer + * @param svcCfg Service configuration + */ + private static void writeFailedConfiguration(BinaryRawWriterEx w, ServiceConfiguration svcCfg) { + Object dotnetSvc = null; + Object dotnetFilter = null; + w.writeString(svcCfg.getName()); + if (svcCfg.getService() instanceof PlatformDotNetServiceImpl) + dotnetSvc = ((PlatformDotNetServiceImpl)svcCfg.getService()).getInternalService(); + + w.writeObjectDetached(dotnetSvc); + w.writeInt(svcCfg.getTotalCount()); + w.writeInt(svcCfg.getMaxPerNodeCount()); + w.writeString(svcCfg.getCacheName()); + w.writeObjectDetached(svcCfg.getAffinityKey()); + + if (svcCfg.getNodeFilter() instanceof PlatformClusterNodeFilterImpl) + dotnetFilter = ((PlatformClusterNodeFilterImpl)svcCfg.getNodeFilter()).getInternalPredicate(); + w.writeObjectDetached(dotnetFilter); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java index c1ab991..0962fc5 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformExceptionTask.java @@ -25,6 +25,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.compute.*; import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceDeploymentException; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.Nullable; @@ -32,16 +34,20 @@ import javax.cache.CacheException; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.cache.processor.EntryProcessorException; +import java.util.Collections; import java.util.List; import java.util.Map; /** * Task to test exception mappings. */ +@SuppressWarnings("unused") // Used by .NET ExceptionsTest. public class PlatformExceptionTask extends ComputeTaskAdapter<String, String> { /** {@inheritDoc} */ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable String arg) { + assert arg != null; + switch (arg) { case "IllegalArgumentException": throw new IllegalArgumentException(arg); case "IllegalStateException": throw new IllegalStateException(arg); @@ -66,6 +72,8 @@ public class PlatformExceptionTask extends ComputeTaskAdapter<String, String> { case "TransactionHeuristicException": throw new TransactionHeuristicException(arg); case "TransactionDeadlockException": throw new TransactionDeadlockException(arg); case "IgniteFutureCancelledException": throw new IgniteFutureCancelledException(arg); + case "ServiceDeploymentException": throw new ServiceDeploymentException(arg, + Collections.singletonList(new ServiceConfiguration().setName("foo"))); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs index d84e6dc..f7568ef 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ExceptionsTest.cs @@ -29,6 +29,7 @@ namespace Apache.Ignite.Core.Tests using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Services; using Apache.Ignite.Core.Transactions; using NUnit.Framework; @@ -93,6 +94,7 @@ namespace Apache.Ignite.Core.Tests CheckException<TransactionHeuristicException>(comp, "TransactionHeuristicException"); CheckException<TransactionDeadlockException>(comp, "TransactionDeadlockException"); CheckException<IgniteFutureCancelledException>(comp, "IgniteFutureCancelledException"); + CheckException<ServiceDeploymentException>(comp, "ServiceDeploymentException"); // Check stopped grid. grid.Dispose(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs index 470804c..18db5d5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesAsyncWrapper.cs @@ -121,6 +121,25 @@ namespace Apache.Ignite.Core.Tests.Services } /** <inheritDoc /> */ + public void DeployAll(IEnumerable<ServiceConfiguration> configurations) + { + try + { + _services.DeployAllAsync(configurations).Wait(); + } + catch (AggregateException ex) + { + throw ex.InnerException; + } + } + + /** <inheritDoc /> */ + public Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations) + { + return _services.DeployAllAsync(configurations); + } + + /** <inheritDoc /> */ public void Cancel(string name) { _services.CancelAsync(name).Wait(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs index e2b3a09..d3dd9b0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/ServicesTest.cs @@ -21,12 +21,13 @@ namespace Apache.Ignite.Core.Tests.Services using System.Collections; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; + using System.IO; using System.Linq; + using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; - //using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Resource; using Apache.Ignite.Core.Services; using Apache.Ignite.Core.Tests.Compute; @@ -82,7 +83,7 @@ namespace Apache.Ignite.Core.Tests.Services { try { - Services.Cancel(SvcName); + Services.CancelAll(); TestUtils.AssertHandleRegistryIsEmpty(1000, Grid1, Grid2, Grid3); } @@ -123,6 +124,35 @@ namespace Apache.Ignite.Core.Tests.Services } /// <summary> + /// Tests several services deployment via DeployAll() method. + /// </summary> + [Test] + public void TestDeployAll([Values(true, false)] bool binarizable) + { + const int num = 10; + + var cfgs = new List<ServiceConfiguration>(); + for (var i = 0; i < num; i++) + { + cfgs.Add(new ServiceConfiguration + { + Name = MakeServiceName(i), + MaxPerNodeCount = 3, + TotalCount = 3, + NodeFilter = new NodeFilter {NodeId = Grid1.GetCluster().GetLocalNode().Id}, + Service = binarizable ? new TestIgniteServiceBinarizable() : new TestIgniteServiceSerializable() + }); + } + + Services.DeployAll(cfgs); + + for (var i = 0; i < num; i++) + { + CheckServiceStarted(Grid1, 3, MakeServiceName(i)); + } + } + + /// <summary> /// Tests cluster singleton deployment. /// </summary> [Test] @@ -455,6 +485,132 @@ namespace Apache.Ignite.Core.Tests.Services } /// <summary> + /// Tests ServiceDeploymentException result via DeployAll() method. + /// </summary> + [Test] + public void TestDeployAllException([Values(true, false)] bool binarizable) + { + const int num = 10; + const int firstFailedIdx = 1; + const int secondFailedIdx = 9; + + var cfgs = new List<ServiceConfiguration>(); + for (var i = 0; i < num; i++) + { + var throwInit = (i == firstFailedIdx || i == secondFailedIdx); + cfgs.Add(new ServiceConfiguration + { + Name = MakeServiceName(i), + MaxPerNodeCount = 2, + TotalCount = 2, + NodeFilter = new NodeFilter { NodeId = Grid1.GetCluster().GetLocalNode().Id }, + Service = binarizable ? new TestIgniteServiceBinarizable { TestProperty = i, ThrowInit = throwInit } + : new TestIgniteServiceSerializable { TestProperty = i, ThrowInit = throwInit } + }); + } + + var deploymentException = Assert.Throws<ServiceDeploymentException>(() => Services.DeployAll(cfgs)); + + var failedCfgs = deploymentException.FailedConfigurations; + Assert.IsNotNull(failedCfgs); + Assert.AreEqual(2, failedCfgs.Count); + + var firstFailedSvc = binarizable ? failedCfgs.ElementAt(0).Service as TestIgniteServiceBinarizable : + failedCfgs.ElementAt(0).Service as TestIgniteServiceSerializable; + var secondFailedSvc = binarizable ? failedCfgs.ElementAt(1).Service as TestIgniteServiceBinarizable : + failedCfgs.ElementAt(1).Service as TestIgniteServiceSerializable; + + Assert.IsNotNull(firstFailedSvc); + Assert.IsNotNull(secondFailedSvc); + + Assert.AreEqual(firstFailedIdx, firstFailedSvc.TestProperty); + Assert.AreEqual(secondFailedIdx, secondFailedSvc.TestProperty); + + for (var i = 0; i < num; i++) + { + if (i != firstFailedIdx && i != secondFailedIdx) + { + CheckServiceStarted(Grid1, 2, MakeServiceName(i)); + } + } + } + + /// <summary> + /// Tests input errors for DeployAll() method. + /// </summary> + [Test] + public void TestDeployAllInputErrors() + { + var nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(null)); + Assert.IsTrue(nullException.Message.Contains("configurations")); + + var argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration>())); + Assert.IsTrue(argException.Message.Contains("empty collection")); + + nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(new List<ServiceConfiguration> { null })); + Assert.IsTrue(nullException.Message.Contains("configurations[0]")); + + nullException = Assert.Throws<ArgumentNullException>(() => Services.DeployAll(new List<ServiceConfiguration> + { + new ServiceConfiguration { Name = SvcName } + })); + Assert.IsTrue(nullException.Message.Contains("configurations[0].Service")); + + argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration> + { + new ServiceConfiguration { Service = new TestIgniteServiceSerializable() } + })); + Assert.IsTrue(argException.Message.Contains("configurations[0].Name")); + + argException = Assert.Throws<ArgumentException>(() => Services.DeployAll(new List<ServiceConfiguration> + { + new ServiceConfiguration { Service = new TestIgniteServiceSerializable(), Name = string.Empty } + })); + Assert.IsTrue(argException.Message.Contains("configurations[0].Name")); + } + + /// <summary> + /// Tests [Serializable] usage of ServiceDeploymentException. + /// </summary> + [Test] + public void TestDeploymentExceptionSerializable() + { + var cfg = new ServiceConfiguration + { + Name = "foo", + CacheName = "cacheName", + AffinityKey = 1, + MaxPerNodeCount = 2, + Service = new TestIgniteServiceSerializable(), + NodeFilter = new NodeFilter(), + TotalCount = 3 + }; + + var ex = new ServiceDeploymentException("msg", new Exception("in"), new[] {cfg}); + + var formatter = new BinaryFormatter(); + var stream = new MemoryStream(); + formatter.Serialize(stream, ex); + stream.Seek(0, SeekOrigin.Begin); + + var res = (ServiceDeploymentException) formatter.Deserialize(stream); + + Assert.AreEqual(ex.Message, res.Message); + Assert.IsNotNull(res.InnerException); + Assert.AreEqual("in", res.InnerException.Message); + + var resCfg = res.FailedConfigurations.Single(); + + Assert.AreEqual(cfg.Name, resCfg.Name); + Assert.AreEqual(cfg.CacheName, resCfg.CacheName); + Assert.AreEqual(cfg.AffinityKey, resCfg.AffinityKey); + Assert.AreEqual(cfg.MaxPerNodeCount, resCfg.MaxPerNodeCount); + Assert.AreEqual(cfg.TotalCount, resCfg.TotalCount); + Assert.IsInstanceOf<TestIgniteServiceSerializable>(cfg.Service); + Assert.IsInstanceOf<NodeFilter>(cfg.NodeFilter); + } + + /// <summary> /// Verifies the deployment exception. /// </summary> private void VerifyDeploymentException(Action<IServices, IService> deploy, bool keepBinary) @@ -496,6 +652,10 @@ namespace Apache.Ignite.Core.Tests.Services Assert.IsTrue(ex.StackTrace.Trim().StartsWith( "at Apache.Ignite.Core.Tests.Services.ServicesTest.TestIgniteServiceSerializable.Init")); + var failedCfgs = deploymentException.FailedConfigurations; + Assert.IsNotNull(failedCfgs); + Assert.AreEqual(1, failedCfgs.Count); + var svc0 = Services.GetService<TestIgniteServiceSerializable>(SvcName); Assert.IsNull(svc0); } @@ -582,6 +742,8 @@ namespace Apache.Ignite.Core.Tests.Services Grid1.GetCompute() .ExecuteJavaTask<object>("org.apache.ignite.platform.PlatformDeployServiceTask", javaSvcName); + TestUtils.WaitForCondition(() => Services.GetServiceDescriptors().Any(x => x.Name == javaSvcName), 1000); + // Verify decriptor var descriptor = Services.GetServiceDescriptors().Single(x => x.Name == javaSvcName); Assert.AreEqual(javaSvcName, descriptor.Name); @@ -707,10 +869,10 @@ namespace Apache.Ignite.Core.Tests.Services /// <summary> /// Checks that service has started on specified grid. /// </summary> - private static void CheckServiceStarted(IIgnite grid, int count = 1) + private static void CheckServiceStarted(IIgnite grid, int count = 1, string svcName = SvcName) { Func<ICollection<TestIgniteServiceSerializable>> getServices = () => - grid.GetServices().GetServices<TestIgniteServiceSerializable>(SvcName); + grid.GetServices().GetServices<TestIgniteServiceSerializable>(svcName); Assert.IsTrue(TestUtils.WaitForCondition(() => count == getServices().Count, 5000)); @@ -779,6 +941,15 @@ namespace Apache.Ignite.Core.Tests.Services protected virtual bool CompactFooter { get { return true; } } /// <summary> + /// Makes Service1-{i} names for services. + /// </summary> + private static string MakeServiceName(int i) + { + // Please note that CheckContext() validates Name.StartsWith(SvcName). + return string.Format("{0}-{1}", SvcName, i); + } + + /// <summary> /// Test service interface for proxying. /// </summary> private interface ITestIgniteService @@ -961,11 +1132,13 @@ namespace Apache.Ignite.Core.Tests.Services public void WriteBinary(IBinaryWriter writer) { writer.WriteInt("TestProp", TestProperty); + writer.WriteBoolean("ThrowInit", ThrowInit); } /** <inheritdoc /> */ public void ReadBinary(IBinaryReader reader) { + ThrowInit = reader.ReadBoolean("ThrowInit"); TestProperty = reader.ReadInt("TestProp"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs index de0277b..db2f84f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs @@ -100,7 +100,9 @@ namespace Apache.Ignite.Core.Impl.Binary } /// <summary> - /// Reads the collection. + /// Reads the collection. The collection could be produced by Java PlatformUtils.writeCollection() + /// from org.apache.ignite.internal.processors.platform.utils package + /// Note: return null if collection is empty /// </summary> public static ICollection<T> ReadCollectionRaw<T, TReader>(this TReader reader, Func<TReader, T> factory) where TReader : IBinaryRawReader @@ -142,5 +144,24 @@ namespace Apache.Ignite.Core.Impl.Binary return res; } + + /// <summary> + /// Reads a nullable collection. The collection could be produced by Java + /// PlatformUtils.writeNullableCollection() from org.apache.ignite.internal.processors.platform.utils package. + /// </summary> + public static ICollection<T> ReadNullableCollectionRaw<T, TReader>(this TReader reader, + Func<TReader, T> factory) where TReader : IBinaryRawReader + { + Debug.Assert(reader != null); + Debug.Assert(factory != null); + + var hasVal = reader.ReadBoolean(); + + if (!hasVal) + { + return null; + } + return ReadCollectionRaw(reader, factory); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs index 3123c07..4e4d327 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs @@ -1559,10 +1559,10 @@ namespace Apache.Ignite.Core.Impl.Binary { err = null; - if (reader.ReadBoolean()) - return reader.ReadObject<object>(); + if (reader.ReadBoolean()) // success indication + return reader.ReadObject<object>(); - err = reader.ReadBoolean() + err = reader.ReadBoolean() // native error indication ? reader.ReadObject<object>() : ExceptionUtils.GetException(reader.Marshaller.Ignite, reader.ReadString(), reader.ReadString(), reader.ReadString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs index fc6009a..422908f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/ServiceProxySerializer.cs @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Services { using System; using System.Collections; + using System.Collections.Generic; using System.Diagnostics; using System.Reflection; using Apache.Ignite.Core.Binary; @@ -185,13 +186,28 @@ namespace Apache.Ignite.Core.Impl.Services return; } + // read failed configurations + ICollection<ServiceConfiguration> failedCfgs; + + try + { + // switch to BinaryMode.Deserialize mode to avoid IService casting exception + reader = marsh.StartUnmarshal(stream); + failedCfgs = reader.ReadNullableCollectionRaw(f => new ServiceConfiguration(f)); + } + catch (Exception e) + { + throw new ServiceDeploymentException("Service deployment failed with an exception. " + + "Examine InnerException for details.", e); + } + var binErr = err as IBinaryObject; throw binErr != null ? new ServiceDeploymentException("Service deployment failed with a binary error. " + - "Examine BinaryCause for details.", binErr) + "Examine BinaryCause for details.", binErr, failedCfgs) : new ServiceDeploymentException("Service deployment failed with an exception. " + - "Examine InnerException for details.", (Exception) err); + "Examine InnerException for details.", (Exception) err, failedCfgs); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs index fca3425..a9aea66 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Services/Services.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Services using System.Linq; using System.Reflection; using System.Threading.Tasks; - using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; @@ -34,6 +33,11 @@ namespace Apache.Ignite.Core.Impl.Services /// </summary> internal sealed class Services : PlatformTargetAdapter, IServices { + /* + * Please keep the following constants in sync with + * \modules\core\src\main\java\org\apache\ignite\internal\processors\platform\services\PlatformServices.java + */ + /** */ private const int OpDeploy = 1; @@ -74,6 +78,12 @@ namespace Apache.Ignite.Core.Impl.Services private const int OpCancelAllAsync = 14; /** */ + private const int OpDeployAll = 15; + + /** */ + private const int OpDeployAllAsync = 16; + + /** */ private readonly IClusterGroup _clusterGroup; /** Invoker binary flag. */ @@ -229,17 +239,34 @@ namespace Apache.Ignite.Core.Impl.Services /** <inheritDoc /> */ public void Deploy(ServiceConfiguration configuration) { - IgniteArgumentCheck.NotNull(configuration, "configuration"); + ValidateConfiguration(configuration, "configuration"); - DoOutInOp(OpDeploy, w => WriteServiceConfiguration(configuration, w), ReadDeploymentResult); + DoOutInOp(OpDeploy, w => configuration.Write(w), ReadDeploymentResult); } /** <inheritDoc /> */ public Task DeployAsync(ServiceConfiguration configuration) { - IgniteArgumentCheck.NotNull(configuration, "configuration"); + ValidateConfiguration(configuration, "configuration"); + + return DoOutOpAsync(OpDeployAsync, w => configuration.Write(w), + _keepBinary, ReadDeploymentResult); + } + + /** <inheritDoc /> */ + public void DeployAll(IEnumerable<ServiceConfiguration> configurations) + { + IgniteArgumentCheck.NotNull(configurations, "configurations"); - return DoOutOpAsync(OpDeployAsync, w => WriteServiceConfiguration(configuration, w), + DoOutInOp(OpDeployAll, w => SerializeConfigurations(configurations, w), ReadDeploymentResult); + } + + /** <inheritDoc /> */ + public Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations) + { + IgniteArgumentCheck.NotNull(configurations, "configurations"); + + return DoOutOpAsync(OpDeployAllAsync, w => SerializeConfigurations(configurations, w), _keepBinary, ReadDeploymentResult); } @@ -381,27 +408,6 @@ namespace Apache.Ignite.Core.Impl.Services } /// <summary> - /// Writes the service configuration. - /// </summary> - private static void WriteServiceConfiguration(ServiceConfiguration configuration, IBinaryRawWriter w) - { - Debug.Assert(configuration != null); - Debug.Assert(w != null); - - w.WriteString(configuration.Name); - w.WriteObject(configuration.Service); - w.WriteInt(configuration.TotalCount); - w.WriteInt(configuration.MaxPerNodeCount); - w.WriteString(configuration.CacheName); - w.WriteObject(configuration.AffinityKey); - - if (configuration.NodeFilter != null) - w.WriteObject(configuration.NodeFilter); - else - w.WriteObject<object>(null); - } - - /// <summary> /// Reads the deployment result. /// </summary> private object ReadDeploymentResult(BinaryReader r) @@ -417,5 +423,43 @@ namespace Apache.Ignite.Core.Impl.Services ServiceProxySerializer.ReadDeploymentResult(s, Marshaller, _keepBinary); return null; } + + /// <summary> + /// Performs ServiceConfiguration validation. + /// </summary> + /// <param name="configuration">Service configuration</param> + /// <param name="argName">argument name</param> + private static void ValidateConfiguration(ServiceConfiguration configuration, string argName) + { + IgniteArgumentCheck.NotNull(configuration, argName); + IgniteArgumentCheck.NotNullOrEmpty(configuration.Name, string.Format("{0}.Name", argName)); + IgniteArgumentCheck.NotNull(configuration.Service, string.Format("{0}.Service", argName)); + } + + /// <summary> + /// Writes a collection of service configurations using passed BinaryWriter + /// Also it performs basic validation of each service configuration and could throw exceptions + /// </summary> + /// <param name="configurations">a collection of service configurations </param> + /// <param name="writer">Binary Writer</param> + private static void SerializeConfigurations(IEnumerable<ServiceConfiguration> configurations, + BinaryWriter writer) + { + var pos = writer.Stream.Position; + writer.WriteInt(0); // Reserve count. + + var cnt = 0; + + foreach (var cfg in configurations) + { + ValidateConfiguration(cfg, string.Format("configurations[{0}]", cnt)); + cfg.Write(writer); + cnt++; + } + + IgniteArgumentCheck.Ensure(cnt > 0, "configurations", "empty collection"); + + writer.Stream.WriteInt(pos, cnt); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs index ac9b4d9..bcbd4fb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Services/IServices.cs @@ -156,6 +156,38 @@ namespace Apache.Ignite.Core.Services Task DeployAsync(ServiceConfiguration configuration); /// <summary> + /// Deploys multiple services described by provided configurations. Depending on specified parameters, + /// multiple instances of the same service may be deployed. Whenever topology changes, + /// Ignite will automatically rebalance the deployed services within cluster to make sure that each node + /// will end up with about equal number of deployed instances whenever possible. + /// <para/> + /// If deployment of some of the provided services fails, then <see cref="ServiceDeploymentException"/> + /// containing a list of failed service configurations + /// (<see cref="ServiceDeploymentException.FailedConfigurations"/>) will be thrown. It is guaranteed that all + /// services that were provided to this method and are not present in the list of failed services are + /// successfully deployed by the moment of the exception being thrown. + /// Note that if exception is thrown, then partial deployment may have occurred. + /// </summary> + /// <param name="configurations">Collection of service configurations to be deployed.</param> + void DeployAll(IEnumerable<ServiceConfiguration> configurations); + + /// <summary> + /// Asynchronously deploys multiple services described by provided configurations. Depending on specified + /// parameters, multiple instances of the same service may be deployed (<see cref="ServiceConfiguration"/>). + /// Whenever topology changes, Ignite will automatically rebalance the deployed services within cluster to make + /// sure that each node will end up with about equal number of deployed instances whenever possible. + /// <para/> + /// If deployment of some of the provided services fails, then <see cref="ServiceDeploymentException"/> + /// containing a list of failed service configurations + /// (<see cref="ServiceDeploymentException.FailedConfigurations"/>) will be thrown. It is guaranteed that all + /// services, that were provided to this method and are not present in the list of failed services, are + /// successfully deployed by the moment of the exception being thrown. + /// Note that if exception is thrown, then partial deployment may have occurred. + /// </summary> + /// <param name="configurations">Collection of service configurations to be deployed.</param> + Task DeployAllAsync(IEnumerable<ServiceConfiguration> configurations); + + /// <summary> /// Cancels service deployment. If a service with specified name was deployed on the grid, /// then <see cref="IService.Cancel"/> method will be called on it. /// <para/> http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs index e91656f..a7b9e7f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceConfiguration.cs @@ -17,11 +17,16 @@ namespace Apache.Ignite.Core.Services { + using System; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; /// <summary> /// Service configuration. /// </summary> + [Serializable] public class ServiceConfiguration { /// <summary> @@ -57,6 +62,69 @@ namespace Apache.Ignite.Core.Services /// <summary> /// Gets or sets node filter used to filter nodes on which the service will be deployed. /// </summary> - public IClusterNodeFilter NodeFilter { get; set; } + public IClusterNodeFilter NodeFilter { get; set; } + + /// <summary> + /// Serializes the Service configuration using IBinaryRawWriter + /// </summary> + /// <param name="w">IBinaryRawWriter</param> + internal void Write(IBinaryRawWriter w) + { + Debug.Assert(w != null); + + w.WriteString(Name); + w.WriteObject(Service); + w.WriteInt(TotalCount); + w.WriteInt(MaxPerNodeCount); + w.WriteString(CacheName); + w.WriteObject(AffinityKey); + + if (NodeFilter != null) + w.WriteObject(NodeFilter); + else + w.WriteObject<object>(null); + } + + /// <summary> + /// Default constructor + /// </summary> + public ServiceConfiguration() + { + // No-op. + } + + /// <summary> + /// Deserialization constructor. Used to collect FailedConfigurations during ServiceDeploymentException + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] + internal ServiceConfiguration(IBinaryRawReader r) + { + Debug.Assert(r != null); + + Name = r.ReadString(); + + try + { + Service = r.ReadObject<IService>(); + } + catch (Exception) + { + // Ignore exceptions in user deserealization code. + } + + TotalCount = r.ReadInt(); + MaxPerNodeCount = r.ReadInt(); + CacheName = r.ReadString(); + AffinityKey = r.ReadObject<object>(); + + try + { + NodeFilter = r.ReadObject<IClusterNodeFilter>(); + } + catch (Exception) + { + // Ignore exceptions in user deserealization code. + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1d906b33/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs index 825f91e..6a4b18f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Services/ServiceDeploymentException.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Services { using System; + using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Runtime.Serialization; using Apache.Ignite.Core.Binary; @@ -29,12 +30,18 @@ namespace Apache.Ignite.Core.Services [Serializable] public class ServiceDeploymentException : IgniteException { - /** Serializer key. */ + /** Serializer key for BinaryCause. */ private const string KeyBinaryCause = "BinaryCause"; + /** Serializer key for Failed Configurations. */ + private const string KeyFailedConfigurations = "FailedConfigurations"; + /** Cause. */ private readonly IBinaryObject _binaryCause; + /** Configurations of services that failed to deploy */ + private readonly ICollection<ServiceConfiguration> _failedCfgs; + /// <summary> /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class. /// </summary> @@ -63,14 +70,28 @@ namespace Apache.Ignite.Core.Services } /// <summary> + /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class with failed configurations. + /// </summary> + /// <param name="message">The message.</param> + /// <param name="cause">The cause.</param> + /// <param name="failedCfgs">List of failed configurations</param> + public ServiceDeploymentException(string message, Exception cause, ICollection<ServiceConfiguration> failedCfgs) + : base(message, cause) + { + _failedCfgs = failedCfgs; + } + + /// <summary> /// Initializes a new instance of the <see cref="ServiceDeploymentException"/> class. /// </summary> /// <param name="message">The message.</param> /// <param name="binaryCause">The binary cause.</param> - public ServiceDeploymentException(string message, IBinaryObject binaryCause) - : base(message) + /// <param name="failedCfgs">List of failed configurations</param> + public ServiceDeploymentException(string message, IBinaryObject binaryCause, + ICollection<ServiceConfiguration> failedCfgs) : base(message) { _binaryCause = binaryCause; + _failedCfgs = failedCfgs; } /// <summary> @@ -82,6 +103,8 @@ namespace Apache.Ignite.Core.Services : base(info, ctx) { _binaryCause = (IBinaryObject)info.GetValue(KeyBinaryCause, typeof(IBinaryObject)); + _failedCfgs = (ICollection<ServiceConfiguration>)info.GetValue(KeyFailedConfigurations, + typeof(ICollection<ServiceConfiguration>)); } /// <summary> @@ -104,8 +127,17 @@ namespace Apache.Ignite.Core.Services public override void GetObjectData(SerializationInfo info, StreamingContext context) { info.AddValue(KeyBinaryCause, _binaryCause); + info.AddValue(KeyFailedConfigurations, _failedCfgs); base.GetObjectData(info, context); } + + /// <summary> + /// Configurations of services that failed to deploy, could be null + /// </summary> + public ICollection<ServiceConfiguration> FailedConfigurations + { + get { return _failedCfgs; } + } } } \ No newline at end of file
