Repository: ignite Updated Branches: refs/heads/ignite-1232 39c5068ee -> 1fe1e579b
ignite-1232 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fe1e579 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fe1e579 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fe1e579 Branch: refs/heads/ignite-1232 Commit: 1fe1e579b2fd3f4d2122eae71f98babcc64412bb Parents: 39c5068 Author: sboikov <[email protected]> Authored: Tue Jul 12 12:28:58 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Jul 12 17:52:17 2016 +0300 ---------------------------------------------------------------------- .../jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 17 + .../processors/query/h2/IgniteH2Indexing.java | 5 + .../query/h2/opt/GridH2IndexBase.java | 4 +- .../query/h2/opt/GridH2RowDescriptor.java | 8 +- .../processors/query/h2/opt/GridH2Table.java | 4 +- .../query/h2/opt/GridH2TreeIndex.java | 3 + .../cache/GridCacheCrossCacheQuerySelfTest.java | 3 + ...acheDistributedJoinCollocatedAndNotTest.java | 11 +- .../IgniteCacheDistributedJoinNoIndexTest.java | 262 ++++++++++++++++ ...ributedJoinPartitionedAndReplicatedTest.java | 6 +- ...CacheDistributedJoinQueryConditionsTest.java | 18 +- .../cache/IgniteCacheJoinNoIndexTest.java | 248 --------------- ...IgniteCacheJoinQueryWithAffinityKeyTest.java | 4 + .../query/IgniteSqlSplitterSelfTest.java | 307 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 6 +- 16 files changed, 629 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java index 9392cc1..37352e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java @@ -51,7 +51,7 @@ public class TcpDiscoveryJdbcIpFinderSelfTest extends dataSrc.setDriverClass("org.h2.Driver"); if (initSchema) - dataSrc.setJdbcUrl("jdbc:h2:mem"); + dataSrc.setJdbcUrl("jdbc:h2:mem:./test"); else { dataSrc.setJdbcUrl("jdbc:h2:mem:jdbc_ipfinder_not_initialized_schema"); http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index e73c470..f41f4c5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -48,6 +48,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -1193,4 +1194,20 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { return next; } } + + /** + * @param cache Cache. + * @param qry Query. + * @return Query plan. + */ + protected final String queryPlan(IgniteCache<?, ?> cache, SqlFieldsQuery qry) { + return (String)cache.query(new SqlFieldsQuery("explain " + qry.getSql()) + .setArgs(qry.getArgs()) + .setLocal(qry.isLocal()) + .setCollocated(qry.isCollocated()) + .setPageSize(qry.getPageSize()) + .setDistributedJoins(qry.isDistributedJoins()) + .setEnforceJoinOrder(qry.isEnforceJoinOrder())) + .getAll().get(0).get(0); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 8dcccb6..5d25dd4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2738,6 +2738,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ + @Override public CacheConfiguration configuration() { + return schema.ccfg; + } + + /** {@inheritDoc} */ @Override public GridUnsafeGuard guard() { return guard; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 97eb162..9ee417b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -214,10 +214,10 @@ public abstract class GridH2IndexBase extends BaseIndex { if (qctx != null) { if (!tbl.isPartitioned()) { - if (qctx.queryType() == REPLICATED || qctx.queryType() == LOCAL) + if (tblFilter == null || qctx.queryType() == REPLICATED || qctx.queryType() == LOCAL) return null; - if (tblFilter == null || tblFilter != tblFilter.getSelect().getTopFilters().get(0)) + if (tblFilter != tblFilter.getSelect().getTopFilters().get(0)) return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index aaa2848..f519c30 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -51,7 +52,12 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid * * @return Cache context. */ - public GridCacheContext<?,?> context(); + public GridCacheContext<?, ?> context(); + + /** + * @return Cache configuration. + */ + public CacheConfiguration configuration(); /** * Creates new row. http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 012fb0b..d240c40 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.h2.api.TableEngine; import org.h2.command.ddl.CreateTableData; @@ -58,6 +57,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; @@ -156,7 +156,7 @@ public class GridH2Table extends TableBase { * @return {@code true} If this is a partitioned table. */ public boolean isPartitioned() { - return desc != null && desc.context().isPartitioned(); + return desc != null && desc.configuration().getCacheMode() == PARTITIONED; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 28ca4bd..6ac6645 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -1253,6 +1253,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) { + if (firstRow == null && lastRow == null) + throw new CacheException("Failed to executed distributed join, index is not used for join condition."); + if (findCalled) { findCalled = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 1f10593..eb84fd3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -131,6 +131,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " + "from FactPurchase f, \"replicated\".DimProduct p where p.id = f.productId "); + qry.setEnforceJoinOrder(true); for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) { X.println("___ -> " + o); @@ -164,6 +165,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "from FactPurchase f, \"replicated\".DimProduct p " + "where p.id = f.productId " + "group by f.productId, p.name"); + qry.setEnforceJoinOrder(true); for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) { X.println("___ -> " + o); @@ -181,6 +183,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { "where p.id = f.productId " + "group by f.productId, p.name " + "having s >= 15"); + qry.setEnforceJoinOrder(true); for (List<?> o : qryProc.queryTwoStep(cache.context(), qry).getAll()) { X.println("___ -> " + o); http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java index 705a898..9d3d329 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.QueryCursor; @@ -93,6 +94,7 @@ public class IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAb entity.setKeyType(Integer.class.getName()); entity.setValueType(Organization.class.getName()); entity.addQueryField("name", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("name"))); ccfg.setQueryEntities(F.asList(entity)); @@ -107,6 +109,7 @@ public class IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAb entity.setValueType(Account.class.getName()); entity.addQueryField("personId", Integer.class.getName(), null); entity.addQueryField("name", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("personId"), new QueryIndex("name"))); ccfg.setQueryEntities(F.asList(entity)); @@ -195,14 +198,6 @@ public class IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAb checkQuery(qry, orgCache, false, 2); - qry = "select o.name, p._key, p.name " + - "from \"org\".Organization o, \"person\".Person p " + - "where p.affKey != o._key"; - - assertTrue(plan(qry, orgCache, false).contains("batched")); - - checkQuery(qry, orgCache, false, 0); - checkQuery("select o.name, p._key, p.name, a.name " + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + "where p.affKey = o._key and p.id = a.personId", orgCache, true, 2); http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java new file mode 100644 index 0000000..51f47aa --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinNoIndexTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("orgName", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + final IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger orgKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + for (int i = 0; i < 3; i++) { + int orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + i)); + + for (int j = 0; j < i; j++) + personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i)); + } + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p " + + "where p.orgName = o.name"); + + qry.setDistributedJoins(true); + + personCache.query(qry).getAll(); + + return null; + } + }, CacheException.class, null); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String orgName; + + /** + * @param orgId Organization ID. + * @param orgName Organization name. + */ + public Person(int orgId, String orgName) { + this.orgId = orgId; + this.orgName = orgName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java index 88f0d21..41014ea 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java @@ -177,8 +177,6 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid */ public void testJoin() throws Exception { join(true); - - join(false); } /** @@ -229,7 +227,7 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid checkQuery("select o.name, p._key, p.name, a.name " + "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " + - "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + "where p.orgId = o._key and p._key = a.personId", orgCache, false, 2); checkQuery("select o.name, p._key, p.name, a.name " + "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " + @@ -256,8 +254,6 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid append(cache3).append(" "). append("where p.orgId = o._key and p._key = a.personId"); - checkQuery(qry.toString(), orgCache, true, 2); - checkQuery(qry.toString(), orgCache, false, 2); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java index 98630e9..1124067 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java @@ -96,8 +96,6 @@ public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs */ public void testJoinQuery1() throws Exception { joinQuery1(true); - - joinQuery1(false); } /** @@ -245,11 +243,11 @@ public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs /** * @throws Exception If failed. */ - public void testJoinQuery3() throws Exception { + public void _testJoinQuery3() throws Exception { Ignite client = grid(2); try { - CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); IgniteCache<Object, Object> pCache = client.createCache(ccfg1); @@ -307,7 +305,7 @@ public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs try { CacheConfiguration ccfg1 = - cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, false))); IgniteCache<Object, Object> pCache = client.createCache(ccfg1); @@ -342,11 +340,11 @@ public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs checkQuery("select p1._key, p1.name, p2._key, p2.name " + "from Person p1, Person p2 " + - "where p1.name != p2.name", pCache, 6); - - checkQuery("select p1._key, p1.name, p2._key, p2.name " + - "from Person p1, Person p2 " + "where p1.name > p2.name", pCache, 3); + +// checkQuery("select p1._key, p1.name, p2._key, p2.name " + +// "from Person p1, Person p2 " + +// "where p1.name != p2.name", pCache, 6); } finally { client.destroyCache(PERSON_CACHE); @@ -361,7 +359,7 @@ public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbs Ignite client = grid(2); try { - CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); IgniteCache<Object, Object> pCache = client.createCache(ccfg1); http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java deleted file mode 100644 index 6a97033..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * - */ -public class IgniteCacheJoinNoIndexTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final String PERSON_CACHE = "person"; - - /** */ - private static final String ORG_CACHE = "org"; - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); - - spi.setIpFinder(IP_FINDER); - - List<CacheConfiguration> ccfgs = new ArrayList<>(); - - { - CacheConfiguration ccfg = configuration(PERSON_CACHE); - - QueryEntity entity = new QueryEntity(); - entity.setKeyType(Integer.class.getName()); - entity.setValueType(Person.class.getName()); - entity.addQueryField("orgId", Integer.class.getName(), null); - entity.addQueryField("orgName", String.class.getName(), null); - - ccfg.setQueryEntities(F.asList(entity)); - - ccfgs.add(ccfg); - } - - { - CacheConfiguration ccfg = configuration(ORG_CACHE); - - QueryEntity entity = new QueryEntity(); - entity.setKeyType(Integer.class.getName()); - entity.setValueType(Organization.class.getName()); - entity.addQueryField("name", String.class.getName(), null); - - ccfg.setQueryEntities(F.asList(entity)); - - ccfgs.add(ccfg); - } - - cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); - - cfg.setClientMode(client); - - return cfg; - } - - /** - * @param name Cache name. - * @return Cache configuration. - */ - private CacheConfiguration configuration(String name) { - CacheConfiguration ccfg = new CacheConfiguration(); - - ccfg.setName(name); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); - ccfg.setAtomicityMode(ATOMIC); - ccfg.setBackups(0); - - return ccfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(2); - - client = true; - - startGrid(2); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testJoin() throws Exception { - Ignite client = grid(2); - - Affinity<Object> aff = client.affinity(PERSON_CACHE); - - IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); - IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); - - AtomicInteger pKey = new AtomicInteger(100_000); - AtomicInteger orgKey = new AtomicInteger(); - - ClusterNode node0 = ignite(0).cluster().localNode(); - ClusterNode node1 = ignite(1).cluster().localNode(); - - for (int i = 0; i < 3; i++) { - int orgId = keyForNode(aff, orgKey, node0); - - orgCache.put(orgId, new Organization("org-" + i)); - - for (int j = 0; j < i; j++) - personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i)); - } - - checkQuery("select o.name, p._key, p.orgName " + - "from \"org\".Organization o, \"person\".Person p " + - "where p.orgName = o.name", personCache, false, 3); - } - - /** - * @param sql SQL. - * @param cache Cache. - * @param enforceJoinOrder Enforce join order flag. - * @param expSize Expected results size. - * @param args Arguments. - */ - private void checkQuery(String sql, - IgniteCache<Object, Object> cache, - boolean enforceJoinOrder, - int expSize, - Object... args) { - String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) - .setDistributedJoins(true) - .setEnforceJoinOrder(enforceJoinOrder)) - .getAll().get(0).get(0); - - log.info("Plan: " + plan); - - SqlFieldsQuery qry = new SqlFieldsQuery(sql); - - qry.setDistributedJoins(true); - qry.setEnforceJoinOrder(enforceJoinOrder); - qry.setArgs(args); - - QueryCursor<List<?>> cur = cache.query(qry); - - List<List<?>> res = cur.getAll(); - - if (expSize != res.size()) - log.info("Results: " + res); - - assertEquals(expSize, res.size()); - } - - /** - * - */ - private static class Person implements Serializable { - /** */ - int orgId; - - /** */ - String orgName; - - /** - * @param orgId Organization ID. - * @param orgName Organization name. - */ - public Person(int orgId, String orgName) { - this.orgId = orgId; - this.orgName = orgName; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Person.class, this); - } - } - - /** - * - */ - private static class Organization implements Serializable { - /** */ - String name; - - /** - * @param name Name. - */ - public Organization(String name) { - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(Organization.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java index dfc450c..d27fe1b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; @@ -296,6 +297,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT account.setValueType(affKey ? AccountKeyWithAffinity.class.getName() : Account.class.getName()); account.addQueryField("personKey", personKeyType, null); account.addQueryField("personId", Integer.class.getName(), null); + account.setIndexes(F.asList(new QueryIndex("personKey"), new QueryIndex("personId"))); QueryEntity person = new QueryEntity(); person.setKeyType(personKeyType); @@ -303,6 +305,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT person.addQueryField("orgId", Integer.class.getName(), null); person.addQueryField("id", Integer.class.getName(), null); person.addQueryField("name", String.class.getName(), null); + person.setIndexes(F.asList(new QueryIndex("orgId"), new QueryIndex("id"), new QueryIndex("name"))); if (affKey && includeAffKey) person.addQueryField("affKey", Integer.class.getName(), null); @@ -311,6 +314,7 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT org.setKeyType(Integer.class.getName()); org.setValueType(Organization.class.getName()); org.addQueryField("name", String.class.getName(), null); + org.setIndexes(F.asList(new QueryIndex("name"))); ccfg.setQueryEntities(F.asList(account, person, org)); http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index fe209d6..0a7ea8b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; @@ -371,6 +372,270 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDistributedJoinsPlan() throws Exception { + List<IgniteCache<Object, Object>> caches = new ArrayList<>(); + + IgniteCache<Object, Object> persPart = + ignite(0).createCache(cacheConfig("persPart", true, Integer.class, Person2.class)); + caches.add(persPart); + + IgniteCache<Object, Object> persPartAff = + ignite(0).createCache(cacheConfig("persPartAff", true, TestKey.class, Person2.class)); + caches.add(persPartAff); + + IgniteCache<Object, Object> orgPart = + ignite(0).createCache(cacheConfig("orgPart", true, Integer.class, Organization.class)); + caches.add(orgPart); + + IgniteCache<Object, Object> orgPartAff = + ignite(0).createCache(cacheConfig("orgPartAff", true, TestKey.class, Organization.class)); + caches.add(orgPartAff); + + IgniteCache<Object, Object> orgRepl = + ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, Organization.class)); + caches.add(orgRepl); + + try { + // Join two partitioned. + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPart\".Organization o " + + "where p.orgId = o._key", + "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPartAff\".Organization o " + + "where p.orgId = o.affKey", + "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPart\".Organization o " + + "where p.orgId = o._key", + "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p inner join \"orgPart\".Organization o " + + "on p.orgId = o._key", + "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p left outer join \"orgPart\".Organization o " + + "on p.orgId = o._key", + "batched:unicast"); + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgPart\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o._key", + "batched:broadcast"); + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgPartAff\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o.affKey", + "batched:broadcast"); + + // Join partitioned and replicated. + + checkQueryPlanContains(persPart, + true, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgRepl\".Organization o " + + "where p.orgId = o._key"); + + checkQueryPlanContains(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgRepl\".Organization o " + + "where p.orgId = o._key"); + + checkQueryPlanContains(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p inner join \"orgRepl\".Organization o " + + "on p.orgId = o._key"); + + checkQueryPlanContains(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p left outer join \"orgRepl\".Organization o " + + "on p.orgId = o._key"); + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgRepl\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o._key", + "batched:broadcast"); + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgRepl\".Organization o inner join \"persPart\".Person2 p " + + "on p.orgId = o._key", + "batched:broadcast"); + + checkQueryPlanContains(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgRepl\".Organization o left outer join \"persPart\".Person2 p " + + "on p.orgId = o._key", + "batched:broadcast"); + + // Join on affinity keys. + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "\"persPart\".Person2 p", + "\"orgPart\".Organization o", + "where p._key = o._key"); + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "\"persPart\".Person2 p", + "\"orgRepl\".Organization o", + "where p._key = o._key"); + + checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ", + "\"persPartAff\".Person2 p", + "\"orgPart\".Organization o", + "where p.affKey = o._key"); + + checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ", + "\"persPartAff\".Person2 p", + "\"orgRepl\".Organization o", + "where p.affKey = o._key"); + } + finally { + for (IgniteCache<Object, Object> cache : caches) + ignite(0).destroyCache(cache.getName()); + } + } + + /** + * @param cache Query cache. + * @param select Select clause. + * @param cache1 Cache name1. + * @param cache2 Cache name2. + * @param where Where clause. + */ + private void checkNoBatchedJoin(IgniteCache<Object, Object> cache, + String select, + String cache1, + String cache2, + String where) { + checkQueryPlanContains(cache, + true, + 0, + select + + "from " + cache1 + "," + cache2 + " "+ where); + checkQueryPlanContains(cache, + false, + 0, + select + + "from " + cache1 + "," + cache2 + " "+ where); + checkQueryPlanContains(cache, + true, + 0, + select + + "from " + cache2 + "," + cache1 + " "+ where); + checkQueryPlanContains(cache, + false, + 0, + select + + "from " + cache2 + "," + cache1 + " "+ where); + } + + /** + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expBatchedJoins Expected batched joins count. + * @param sql Query. + * @param expText Expected text to find in plan. + */ + private void checkQueryPlanContains(IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expBatchedJoins, + String sql, + String...expText) { + checkQueryPlanContains(cache, + enforceJoinOrder, + expBatchedJoins, + new SqlFieldsQuery(sql), + expText); + + checkQueryPlanContains(cache, + enforceJoinOrder, + expBatchedJoins, + new SqlFieldsQuery("select * from (" + sql + ")"), + expText); + } + + /** + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expBatchedJoins Expected batched joins count. + * @param qry Query. + * @param expText Expected text to find in plan. + */ + private void checkQueryPlanContains(IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expBatchedJoins, + SqlFieldsQuery qry, + String...expText) { + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setDistributedJoins(true); + + String plan = queryPlan(cache, qry); + + log.info("Plan: " + plan); + + assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']', + expBatchedJoins, + StringUtils.countOccurrencesOf(plan, "batched")); + + int startIdx = 0; + + for (String exp : expText) { + int idx = exp.indexOf(exp, startIdx); + + if (idx == -1) { + fail("Plan does not contain expected string [startIdx=" + startIdx + + ", plan=" + plan + + ", exp=" + exp + ']'); + } + + startIdx = idx + 1; + } + } + + /** * Test HAVING clause. */ public void testHaving() { @@ -658,6 +923,48 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { /** * */ + private static class TestKey implements Serializable { + /** */ + @QuerySqlField(index = true) + @AffinityKeyMapped + int affKey; + + /** */ + @QuerySqlField() + int id; + + /** + * @param affKey Affinity key. + * @param id ID. + */ + public TestKey(int affKey, int id) { + this.affKey = affKey; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey personKey = (TestKey)o; + + return id == personKey.id; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ private static class Organization implements Serializable { /** */ @QuerySqlField http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe1e579/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index bd39c49..50bf51c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -27,10 +27,11 @@ import org.apache.ignite.internal.processors.cache.IgniteBinaryWrappedObjectFiel import org.apache.ignite.internal.processors.cache.IgniteCacheCollocatedQuerySelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheCrossCacheJoinRandomTest; import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCollocatedAndNotTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinCustomAffinityMapper; import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinPartitionedAndReplicatedTest; import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinQueryConditionsTest; import org.apache.ignite.internal.processors.cache.IgniteCacheDuplicateEntityConfigurationSelfTest; -import org.apache.ignite.internal.processors.cache.IgniteCacheJoinNoIndexTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheDistributedJoinNoIndexTest; import org.apache.ignite.internal.processors.cache.IgniteCacheJoinPartitionedAndReplicatedTest; import org.apache.ignite.internal.processors.cache.IgniteCacheJoinQueryWithAffinityKeyTest; import org.apache.ignite.internal.processors.cache.IgniteCacheLargeResultSelfTest; @@ -129,7 +130,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheDistributedJoinPartitionedAndReplicatedTest.class); suite.addTestSuite(IgniteCacheDistributedJoinQueryConditionsTest.class); suite.addTestSuite(IgniteCacheJoinPartitionedAndReplicatedTest.class); - suite.addTestSuite(IgniteCacheJoinNoIndexTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinNoIndexTest.class); + suite.addTestSuite(IgniteCacheDistributedJoinCustomAffinityMapper.class); suite.addTestSuite(IgniteCrossCachesJoinsQueryTest.class); suite.addTestSuite(IgniteCacheCrossCacheJoinRandomTest.class);
