IGNITE-3561: .NET: Improved distributed joins API for LINQ, added more tests. This closes #893. This closes #900.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a808386 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a808386 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a808386 Branch: refs/heads/ignite-3443 Commit: 9a8083860ed61dde981cb6da65de869dc4205ae0 Parents: 42f42ea Author: Pavel Tupitsyn <[email protected]> Authored: Wed Jul 27 13:42:30 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 13:42:30 2016 +0300 ---------------------------------------------------------------------- .../Cache/Query/CacheLinqTest.cs | 55 ++++++++++-- .../Cache/Query/CacheQueriesTest.cs | 29 ++++--- .../Apache.Ignite.Linq.csproj | 1 + .../Apache.Ignite.Linq/CacheExtensions.cs | 23 +---- .../Apache.Ignite.Linq/Impl/CacheQueryable.cs | 15 ++-- .../dotnet/Apache.Ignite.Linq/QueryOptions.cs | 91 ++++++++++++++++++++ 6 files changed, 165 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs index f76a74c..679f03a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs @@ -36,7 +36,6 @@ namespace Apache.Ignite.Core.Tests.Cache.Query using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cache.Query; - using Apache.Ignite.Core.Common; using Apache.Ignite.Linq; using NUnit.Framework; @@ -955,7 +954,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Query var cache = GetPersonCache(); // Check regular query - var query = (ICacheQueryable) cache.AsCacheQueryable(true, null, 999, false, true).Where(x => x.Key > 10); + var query = (ICacheQueryable) cache.AsCacheQueryable(new QueryOptions + { + Local = true, + PageSize = 999, + EnforceJoinOrder = true + }).Where(x => x.Key > 10); Assert.AreEqual(cache.Name, query.CacheName); Assert.AreEqual(cache.Ignite, query.Ignite); @@ -983,14 +987,10 @@ namespace Apache.Ignite.Core.Tests.Cache.Query Assert.IsFalse(fq.EnforceJoinOrder); // Check distributed joins flag propagation - var distrQuery = cache.AsCacheQueryable(true, null, 999, true, true).Where(x => x.Key > 10); + var distrQuery = cache.AsCacheQueryable(new QueryOptions {EnableDistributedJoins = true}) + .Where(x => x.Key > 10); query = (ICacheQueryable) distrQuery; Assert.IsTrue(query.GetFieldsQuery().EnableDistributedJoins); - - // Easy check that EnableDistributedJoins is propagated to Java: it throws an error on replicated cache - var ex = Assert.Throws<IgniteException>(() => Assert.AreEqual(0, distrQuery.ToArray().Length)); - Assert.AreEqual("Queries using distributed JOINs have to be run on partitioned cache, not on replicated.", - ex.Message); } /// <summary> @@ -1022,6 +1022,45 @@ namespace Apache.Ignite.Core.Tests.Cache.Query } /// <summary> + /// Tests the distributed joins. + /// </summary> + [Test] + public void TestDistributedJoins() + { + var ignite = Ignition.GetIgnite(); + + // Create and populate partitioned caches + var personCache = ignite.CreateCache<int, Person>(new CacheConfiguration("partitioned_persons", + new QueryEntity(typeof(int), typeof(Person)))); + + personCache.PutAll(GetSecondPersonCache().ToDictionary(x => x.Key, x => x.Value)); + + var roleCache = ignite.CreateCache<int, Role>(new CacheConfiguration("partitioned_roles", + new QueryEntity(typeof(int), typeof(Role)))); + + roleCache.PutAll(GetRoleCache().ToDictionary(x => x.Key.Foo, x => x.Value)); + + // Test non-distributed join: returns partial results + var persons = personCache.AsCacheQueryable(); + var roles = roleCache.AsCacheQueryable(); + + var res = persons.Join(roles, person => person.Key - PersonCount, role => role.Key, (person, role) => role) + .ToArray(); + + Assert.Greater(res.Length, 0); + Assert.Less(res.Length, RoleCount); + + // Test distributed join: returns complete results + persons = personCache.AsCacheQueryable(new QueryOptions {EnableDistributedJoins = true}); + roles = roleCache.AsCacheQueryable(new QueryOptions {EnableDistributedJoins = true}); + + res = persons.Join(roles, person => person.Key - PersonCount, role => role.Key, (person, role) => role) + .ToArray(); + + Assert.AreEqual(RoleCount, res.Length); + } + + /// <summary> /// Gets the person cache. /// </summary> /// <returns></returns> http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs index 7bfd202..a8ffe13 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs @@ -552,31 +552,36 @@ namespace Apache.Ignite.Core.Tests.Cache.Query [Test] public void TestDistributedJoins() { - // Easy check that EnableDistributedJoins is propagated to Java: it throws an error on replicated cache var cache = GetIgnite(0).GetOrCreateCache<int, QueryPerson>( new CacheConfiguration("replicatedCache") { - CacheMode = CacheMode.Replicated, QueryEntities = new[] { - new QueryEntity(typeof(QueryPerson)) + new QueryEntity(typeof(int), typeof(QueryPerson)) { Fields = new[] {new QueryField("age", typeof(int))} } } }); - cache[1] = new QueryPerson("Test", 150); + const int count = 100; - // Distributed joins disabled: query works - var qry = new SqlQuery(typeof(QueryPerson), "age < 50"); - Assert.AreEqual(0, cache.Query(qry).GetAll().Count); + cache.PutAll(Enumerable.Range(0, count).ToDictionary(x => x, x => new QueryPerson("Name" + x, x))); - // Distributed joins enabled: query fails - qry.EnableDistributedJoins = true; - var ex = Assert.Throws<IgniteException>(() => Assert.AreEqual(0, cache.Query(qry).GetAll().Count)); - Assert.AreEqual("Queries using distributed JOINs have to be run on partitioned cache, not on replicated.", - ex.Message); + // Test non-distributed join: returns partial results + var sql = "select T0.Age from QueryPerson as T0 " + + "inner join QueryPerson as T1 on ((? - T1.Age - 1) = T0._key)"; + + var res = cache.QueryFields(new SqlFieldsQuery(sql, count)).GetAll().Distinct().Count(); + + Assert.Greater(res, 0); + Assert.Less(res, count); + + // Test distributed join: returns complete results + res = cache.QueryFields(new SqlFieldsQuery(sql, count) {EnableDistributedJoins = true}) + .GetAll().Distinct().Count(); + + Assert.AreEqual(count, res); } /// <summary> http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Linq/Apache.Ignite.Linq.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Apache.Ignite.Linq.csproj b/modules/platforms/dotnet/Apache.Ignite.Linq/Apache.Ignite.Linq.csproj index 3abfa56..72e5f68 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Linq/Apache.Ignite.Linq.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Apache.Ignite.Linq.csproj @@ -71,6 +71,7 @@ <Compile Include="Impl\ExpressionWalker.cs" /> <Compile Include="Package-Info.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="QueryOptions.cs" /> </ItemGroup> <ItemGroup> <None Include="Apache.Ignite.Linq.nuspec" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs index e6d585c..4b536f4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs @@ -20,7 +20,6 @@ namespace Apache.Ignite.Linq using System.Linq; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; - using Apache.Ignite.Core.Cache.Query; using Apache.Ignite.Linq.Impl; /// <summary> @@ -93,7 +92,7 @@ namespace Apache.Ignite.Linq public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>( this ICache<TKey, TValue> cache, bool local, string tableName) { - return cache.AsCacheQueryable(local, tableName, SqlFieldsQuery.DfltPageSize, false, false); + return cache.AsCacheQueryable(new QueryOptions {Local = local, TableName = tableName}); } /// <summary> @@ -108,28 +107,14 @@ namespace Apache.Ignite.Linq /// <typeparam name="TKey">The type of the key.</typeparam> /// <typeparam name="TValue">The type of the value.</typeparam> /// <param name="cache">The cache.</param> - /// <param name="local">Local flag. When set query will be executed only on local node, so only local - /// entries will be returned as query result.</param> - /// <param name="tableName">Name of the table. - /// <para /> - /// Table name is equal to short class name of a cache value. - /// When a cache has only one type of values, or only one <see cref="QueryEntity" /> defined, - /// table name will be inferred and can be omitted.</param> - /// <param name="pageSize">Query cursor page size. - /// Defaults to <see cref="SqlFieldsQuery.DfltPageSize"/>.</param> - /// <param name="enableDistributedJoins">Distributed joins option, see - /// <see cref="SqlFieldsQuery.EnableDistributedJoins" />.</param> - /// <param name="enforceJoinOrder">Enforce join order flag, - /// see <see cref="SqlFieldsQuery.EnforceJoinOrder" />.</param> + /// <param name="queryOptions">The query options.</param> /// <returns> /// <see cref="IQueryable{T}" /> instance over this cache. /// </returns> public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>( - this ICache<TKey, TValue> cache, bool local, string tableName, int pageSize, - bool enableDistributedJoins, bool enforceJoinOrder) + this ICache<TKey, TValue> cache, QueryOptions queryOptions) { - return new CacheQueryable<TKey, TValue>(cache, local, tableName, pageSize, enableDistributedJoins, - enforceJoinOrder); + return new CacheQueryable<TKey, TValue>(cache, queryOptions); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs index 7ade159..7372776 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs @@ -30,17 +30,12 @@ namespace Apache.Ignite.Linq.Impl /// Initializes a new instance of the <see cref="CacheQueryable{TKey, TValue}" /> class. /// </summary> /// <param name="cache">The cache.</param> - /// <param name="local">Local flag.</param> - /// <param name="tableName">Name of the table.</param> - /// <param name="pageSize">Size of the page.</param> - /// <param name="enableDistributedJoins">Distributed joins flag.</param> - /// <param name="enforceJoinOrder">Enforce join order flag.</param> - public CacheQueryable(ICache<TKey, TValue> cache, bool local, string tableName, int pageSize, - bool enableDistributedJoins, - bool enforceJoinOrder) + /// <param name="queryOptions">The query options.</param> + public CacheQueryable(ICache<TKey, TValue> cache, QueryOptions queryOptions) : base(new CacheFieldsQueryProvider(CacheQueryParser.Instance, - new CacheFieldsQueryExecutor((ICacheInternal) cache, local, pageSize, enableDistributedJoins, - enforceJoinOrder), cache.Ignite, cache.GetConfiguration(), tableName, typeof(TValue))) + new CacheFieldsQueryExecutor((ICacheInternal) cache, queryOptions.Local, queryOptions.PageSize, + queryOptions.EnableDistributedJoins, queryOptions.EnforceJoinOrder), + cache.Ignite, cache.GetConfiguration(), queryOptions.TableName, typeof(TValue))) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/9a808386/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs new file mode 100644 index 0000000..4fe3ee5 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Linq +{ + using System.ComponentModel; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Cache.Query; + + /// <summary> + /// Cache query options. + /// </summary> + public class QueryOptions + { + /// <summary> Default page size. </summary> + public const int DefaultPageSize = SqlFieldsQuery.DfltPageSize; + + /// <summary> + /// Initializes a new instance of the <see cref="QueryOptions"/> class. + /// </summary> + public QueryOptions() + { + PageSize = DefaultPageSize; + } + + /// <summary> + /// Local flag. When set query will be executed only on local node, so only local + /// entries will be returned as query result. + /// <para /> + /// Defaults to <c>false</c>. + /// </summary> + public bool Local { get; set; } + + /// <summary> + /// Page size, defaults to <see cref="DefaultPageSize"/>. + /// </summary> + [DefaultValue(DefaultPageSize)] + public int PageSize { get; set; } + + /// <summary> + /// Gets or sets the name of the table. + /// <para /> + /// Table name is equal to short class name of a cache value. + /// When a cache has only one type of values, or only one <see cref="QueryEntity" /> defined, + /// table name will be inferred and can be omitted (null). + /// </summary> + /// <value> + /// The name of the table. + /// </value> + public string TableName { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether distributed joins should be enabled for this query. + /// <para /> + /// When disabled, join results will only contain colocated data (joins work locally). + /// When enabled, joins work as expected, no matter how the data is distributed. + /// </summary> + /// <value> + /// <c>true</c> if enable distributed joins should be enabled; otherwise, <c>false</c>. + /// </value> + public bool EnableDistributedJoins { get; set; } + + /// <summary> + /// Gets or sets a value indicating whether join order of tables should be enforced. + /// <para /> + /// When true, query optimizer will not reorder tables in join. + /// <para /> + /// It is not recommended to enable this property until you are sure that your indexes + /// and the query itself are correct and tuned as much as possible but + /// query optimizer still produces wrong join order. + /// </summary> + /// <value> + /// <c>true</c> if join order should be enforced; otherwise, <c>false</c>. + /// </value> + public bool EnforceJoinOrder { get; set; } + } +}
