http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java new file mode 100644 index 0000000..d7610dc --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private static final String ACCOUNT_CACHE = "acc"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name, CacheMode cacheMode) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setCacheMode(cacheMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); + + return ccfg; + } + + /** + * @param idx Use index flag. + * @param persCacheMode Person cache mode. + * @param orgCacheMode Organization cache mode. + * @param accCacheMode Account cache mode. + * @return Configurations. + */ + private List<CacheConfiguration> caches(boolean idx, + CacheMode persCacheMode, + CacheMode orgCacheMode, + CacheMode accCacheMode) { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE, persCacheMode); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + if (idx) + entity.setIndexes(F.asList(new QueryIndex("orgId"), new QueryIndex("name"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE, orgCacheMode); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + if (idx) + entity.setIndexes(F.asList(new QueryIndex("name"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, accCacheMode); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + if (idx) { + entity.setIndexes(F.asList(new QueryIndex("orgId"), + new QueryIndex("personId"), + new QueryIndex("name"))); + } + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + return ccfgs; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoin1() throws Exception { + join(true, REPLICATED, PARTITIONED, PARTITIONED); + } + + /** + * @throws Exception If failed. + */ + public void testJoin2() throws Exception { + join(true, PARTITIONED, REPLICATED, PARTITIONED); + } + + /** + * @throws Exception If failed. + */ + public void testJoin3() throws Exception { + join(true, PARTITIONED, PARTITIONED, REPLICATED); + } + + /** + * @param idx Use index flag. + * @param persCacheMode Person cache mode. + * @param orgCacheMode Organization cache mode. + * @param accCacheMode Account cache mode. + * @throws Exception If failed. + */ + private void join(boolean idx, CacheMode persCacheMode, CacheMode orgCacheMode, CacheMode accCacheMode) + throws Exception { + Ignite client = grid(2); + + for (CacheConfiguration ccfg : caches(idx, persCacheMode, orgCacheMode, accCacheMode)) + client.createCache(ccfg); + + try { + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger accKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + /** + * One organization, one person, two accounts. + */ + + { + int orgId1 = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId1, new Organization("obj-" + orgId1)); + + int pid1 = keyForNode(aff, pKey, node0); + personCache.put(pid1, new Person(orgId1, "o1-p1")); + + accCache.put(keyForNode(aff, accKey, node0), new Account(pid1, orgId1, "a0")); + accCache.put(keyForNode(aff, accKey, node1), new Account(pid1, orgId1, "a1")); + } + + IgniteCache<Object, Object> qryCache = replicated(orgCache) ? personCache : orgCache; + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p, \"acc\".Account a " + + "where p._key = a.personId", qryCache, false, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 2); + + checkQuery("select * from (select o.name n1, p._key, p.name n2, a.name n3 " + + "from \"acc\".Account a, \"person\".Person p, \"org\".Organization o " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key)", qryCache, false, 2); + + checkQuery("select * from (select o.name n1, p._key, p.name n2, a.name n3 " + + "from \"person\".Person p, \"acc\".Account a, \"org\".Organization o " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key)", qryCache, false, 2); + + List<List<?>> res = checkQuery("select count(*) " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 1); + + assertEquals(2L, res.get(0).get(0)); + + checkQueries(qryCache, 2); + + { + int orgId2 = keyForNode(aff, orgKey, node1); + + orgCache.put(orgId2, new Organization("obj-" + orgId2)); + + int pid2 = keyForNode(aff, pKey, node0); + personCache.put(pid2, new Person(orgId2, "o2-p1")); + + accCache.put(keyForNode(aff, accKey, node0), new Account(pid2, orgId2, "a3")); + accCache.put(keyForNode(aff, accKey, node1), new Account(pid2, orgId2, "a4")); + } + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 4); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o inner join \"person\".Person p on p.orgId = o._key " + + "inner join \"acc\".Account a on p._key = a.personId and a.orgId=o._key", qryCache, false, 4); + + res = checkQuery("select count(*) " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key", qryCache, false, 1); + + assertEquals(4L, res.get(0).get(0)); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and a.orgId = o._key and a.orgId=o._key", qryCache, false, 4); + + res = checkQuery("select count(*) " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.orgId = o._key and a.orgId = o._key and a.orgId=o._key", qryCache, false, 1); + + assertEquals(4L, res.get(0).get(0)); + + checkQueries(qryCache, 4); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + client.destroyCache(ACCOUNT_CACHE); + } + } + + /** + * @param qryCache Query cache. + * @param expSize Expected results size. + */ + private void checkQueries(IgniteCache<Object, Object> qryCache, int expSize) { + String[] cacheNames = {"\"org\".Organization o", "\"person\".Person p", "\"acc\".Account a"}; + + for (int c1 = 0; c1 < cacheNames.length; c1++) { + for (int c2 = 0; c2 < cacheNames.length; c2++) { + if (c2 == c1) + continue; + + for (int c3 = 0; c3 < cacheNames.length; c3++) { + if (c3 == c1 || c3 == c2) + continue; + + String cache1 = cacheNames[c1]; + String cache2 = cacheNames[c2]; + String cache3 = cacheNames[c3]; + + String qry = "select o.name, p._key, p.name, a.name from " + + cache1 + ", " + + cache2 + ", " + + cache3 + " " + + "where p.orgId = o._key and p._key = a.personId and a.orgId=o._key"; + + checkQuery(qry, qryCache, false, expSize); + + qry = "select o.name, p._key, p.name, a.name from " + + cache1 + ", " + + cache2 + ", " + + cache3 + " " + + "where p.orgId = o._key and a.orgId = o._key and a.orgId=o._key"; + + checkQuery(qry, qryCache, false, expSize); + } + } + } + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + * @return Results. + */ + private List<List<?>> checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + log.info("Plan: " + queryPlan(cache, qry)); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + + return res; + } + + /** + * @param cache Cache. + * @return {@code True} if cache is replicated. + */ + private boolean replicated(IgniteCache<?, ?> cache) { + return cache.getConfiguration(CacheConfiguration.class).getCacheMode() == REPLICATED; + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + int personId; + + /** */ + int orgId; + + /** */ + String name; + + /** + * @param personId Person ID. + * @param orgId Organization ID. + * @param name Name. + */ + public Account(int personId, int orgId, String name) { + this.personId = personId; + this.orgId = orgId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java new file mode 100644 index 0000000..af56c91 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private boolean client; + + /** */ + private int total; + + /** */ + private CacheMemoryMode memMode = ONHEAP_TIERED; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery1() throws Exception { + joinQuery1(true); + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery1Offheap() throws Exception { + memMode = OFFHEAP_TIERED; + + testJoinQuery1(); + } + + /** + * @param idx Use index flag. + * @throws Exception If failed. + */ + private void joinQuery1(boolean idx) throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(idx, idx))); + CacheConfiguration ccfg2 = + cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(idx))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + client.createCache(ccfg2); + + List<Integer> orgIds = putData1(); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key", pCache, total); + + checkQuery("select * from (select o._key, o.name, p._key pKey, p.name pName " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key)", pCache, total); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o inner join Person p " + + "on p.orgId = o._key", pCache, total); + + checkQuery("select * from (select o._key o_key, o.name o_name, p._key p_key, p.name p_name " + + "from \"org\".Organization o inner join Person p " + + "on p.orgId = o._key)", pCache, total); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key > " + orgIds.get(2), pCache, total - 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and o._key < " + orgIds.get(4), pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name", pCache, total); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 0); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.name = o.name and o.name='obj-" + orgIds.get(3) + "'", pCache, 3); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery2() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + List<Integer> pIds = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId, "p-" + orgId)); + + pIds.add(pId); + } + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key >= 0", pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 2); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void _testJoinQuery3() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + List<Integer> pIds = new ArrayList<>(); + + for (int i = 0; i < 3; i++) { + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId)); + + pIds.add(pId); + } + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key", pCache, 9); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 3); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 6); + + checkQuery("select o._key, o.name, p._key, p.name " + + "from \"org\".Organization o, Person p " + + "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and p._key <= " + pIds.get(2), pCache, 9); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery4() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger pKey = new AtomicInteger(); + + Integer pId0 = keyForNode(aff, pKey, node0); + + pCache.put(pId0, new Person(0, "p0")); + + for (int i = 0; i < 3; i++) { + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(0, "p")); + } + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key", pCache, 6); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key and p1._key=" + pId0, pCache, 3); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p2._key > p1._key and p1.name='p0'", pCache, 3); + + checkQuery("select p1._key, p1.name, p2._key, p2.name " + + "from Person p1, Person p2 " + + "where p1.name > p2.name", pCache, 3); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery5() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId, "p-" + orgId)); + + checkQuery("select o._key from \"org\".Organization o, Person p where p.orgId = o._key", pCache, 1); + + // Distributed join is not enabled for expressions, just check query does not fail. + checkQuery("select o.name from \"org\".Organization o where o._key in " + + "(select o._key from \"org\".Organization o, Person p where p.orgId = o._key)", pCache, 0); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoinQuery6() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = + cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(true, true))); + CacheConfiguration ccfg2 = + cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(true))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + + client.createCache(ccfg2); + + putData1(); + + checkQuery("select _key, name from \"org\".Organization o " + + "inner join (select orgId from Person) p on p.orgId = o._key", pCache, total); + + checkQuery("select o._key, o.name from (select _key, name from \"org\".Organization) o " + + "inner join Person p on p.orgId = o._key", pCache, total); + + checkQuery("select o._key, o.name from (select _key, name from \"org\".Organization) o " + + "inner join (select orgId from Person) p on p.orgId = o._key", pCache, total); + + checkQuery("select * from " + + "(select _key, name from \"org\".Organization) o " + + "inner join " + + "(select orgId from Person) p " + + "on p.orgId = o._key", pCache, total); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, IgniteCache<Object, Object> cache, int expSize, Object... args) { + log.info("Execute query: " + sql); + + checkQuery(sql, cache, false, expSize, args); + + checkQuery(sql, cache, true, expSize, args); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + * @param args Arguments. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize, + Object... args) { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setArgs(args); + + log.info("Plan: " + queryPlan(cache, qry)); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + + /** + * @param idxName Name index flag. + * @param idxOrgId Org ID index flag. + * @return Entity. + */ + private QueryEntity personEntity(boolean idxName, boolean idxOrgId) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + List<QueryIndex> idxs = new ArrayList<>(); + + if (idxName) { + QueryIndex idx = new QueryIndex("name"); + + idxs.add(idx); + } + + if (idxOrgId) { + QueryIndex idx = new QueryIndex("orgId"); + + idxs.add(idx); + } + + entity.setIndexes(idxs); + + return entity; + } + + /** + * @param idxName Name index flag. + * @return Entity. + */ + private QueryEntity organizationEntity(boolean idxName) { + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + + entity.addQueryField("name", String.class.getName(), null); + + if (idxName) { + QueryIndex idx = new QueryIndex("name"); + + entity.setIndexes(F.asList(idx)); + } + + return entity; + } + + /** + * @return Organization ids. + */ + private List<Integer> putData1() { + total = 0; + + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + + AtomicInteger pKey = new AtomicInteger(); + AtomicInteger orgKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + List<Integer> data = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + int orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("obj-" + orgId)); + + for (int j = 0; j < i; j++) { + personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "obj-" + orgId)); + + total++; + } + + data.add(orgId); + } + + return data; + } + + /** + * @param name Cache name. + * @return Configuration. + */ + private CacheConfiguration cacheConfiguration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + ccfg.setMemoryMode(memMode); + + return ccfg; + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java new file mode 100644 index 0000000..ea3b141 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Iterator; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + */ +public class IgniteCacheDistributedJoinTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static Connection conn; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + CacheConfiguration<Integer, A> ccfga = new CacheConfiguration<>(); + + ccfga.setName("a"); + ccfga.setSqlSchema("A"); + ccfga.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfga.setBackups(1); + ccfga.setCacheMode(CacheMode.PARTITIONED); + ccfga.setIndexedTypes(Integer.class, A.class); + + CacheConfiguration<Integer, B> ccfgb = new CacheConfiguration<>(); + + ccfgb.setName("b"); + ccfgb.setSqlSchema("B"); + ccfgb.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfgb.setBackups(1); + ccfgb.setCacheMode(CacheMode.PARTITIONED); + ccfgb.setIndexedTypes(Integer.class, B.class); + + CacheConfiguration<Integer, C> ccfgc = new CacheConfiguration<>(); + + ccfgc.setName("c"); + ccfgc.setSqlSchema("C"); + ccfgc.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfgc.setBackups(1); + ccfgc.setCacheMode(CacheMode.PARTITIONED); + ccfgc.setIndexedTypes(Integer.class, C.class); + + cfg.setCacheConfiguration(ccfga, ccfgb, ccfgc); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(4); + + awaitPartitionMapExchange(); + + conn = DriverManager.getConnection("jdbc:h2:mem:"); + + Statement s = conn.createStatement(); + + s.execute("create schema a"); + s.execute("create schema b"); + s.execute("create schema c"); + + s.execute("create table a.a(a bigint, b bigint, c bigint)"); + s.execute("create table b.b(a bigint, b bigint, c bigint)"); + s.execute("create table c.c(a bigint, b bigint, c bigint)"); + + s.execute("create index on a.a(a)"); + s.execute("create index on a.a(b)"); + s.execute("create index on a.a(c)"); + + s.execute("create index on b.b(a)"); + s.execute("create index on b.b(b)"); + s.execute("create index on b.b(c)"); + + s.execute("create index on c.c(a)"); + s.execute("create index on c.c(b)"); + s.execute("create index on c.c(c)"); + + GridRandom rnd = new GridRandom(); + Ignite ignite = ignite(0); + + IgniteCache<Integer,A> a = ignite.cache("a"); + IgniteCache<Integer,B> b = ignite.cache("b"); + IgniteCache<Integer,C> c = ignite.cache("c"); + + for (int i = 0; i < 100; i++) { + a.put(i, insert(s, new A(rnd.nextInt(50), rnd.nextInt(100), rnd.nextInt(150)))); + b.put(i, insert(s, new B(rnd.nextInt(100), rnd.nextInt(50), rnd.nextInt(150)))); + c.put(i, insert(s, new C(rnd.nextInt(150), rnd.nextInt(100), rnd.nextInt(50)))); + } + + checkSameResult(s, a, "select a, count(*) from a group by a order by a"); + checkSameResult(s, a, "select b, count(*) from a group by b order by b"); + checkSameResult(s, a, "select c, count(*) from a group by c order by c"); + + checkSameResult(s, b, "select a, count(*) from b group by a order by a"); + checkSameResult(s, b, "select b, count(*) from b group by b order by b"); + checkSameResult(s, b, "select c, count(*) from b group by c order by c"); + + checkSameResult(s, c, "select a, count(*) from c group by a order by a"); + checkSameResult(s, c, "select b, count(*) from c group by b order by b"); + checkSameResult(s, c, "select c, count(*) from c group by c order by c"); + + s.close(); + } + + /** + * @param s Statement. + * @param c Cache. + * @param qry Query. + * @throws SQLException If failed. + */ + private <Z extends X> void checkSameResult(Statement s, IgniteCache<Integer, Z> c, String qry) throws SQLException { + s.executeUpdate("SET SCHEMA " + c.getName()); + + try ( + ResultSet rs1 = s.executeQuery(qry); + QueryCursor<List<?>> rs2 = c.query(new SqlFieldsQuery(qry).setDistributedJoins(true)) + ) { + Iterator<List<?>> iter = rs2.iterator(); + + for (;;) { + if (!rs1.next()) { + assertFalse(iter.hasNext()); + + return; + } + + assertTrue(iter.hasNext()); + + List<?> row = iter.next(); + + for (int i = 0; i < row.size(); i++) + assertEquals(rs1.getLong(i + 1), row.get(i)); + } + } + } + + /** + * @param s Statement. + * @param z Value. + * @return Value. + * @throws SQLException If failed. + */ + private static <Z extends X> Z insert(Statement s, Z z) throws SQLException { + String tbl = z.getClass().getSimpleName(); + + tbl = tbl + "." + tbl; + + String insert = "insert into " + tbl + " values(" + z.a + ", " + z.b + ", " + z.c + ")"; + + s.executeUpdate(insert); + + return z; + } + + /** + * @throws Exception If failed. + */ + public void testJoins() throws Exception { + Ignite ignite = ignite(0); + + IgniteCache<Integer,A> a = ignite.cache("a"); + IgniteCache<Integer,B> b = ignite.cache("b"); + IgniteCache<Integer,C> c = ignite.cache("c"); + + Statement s = conn.createStatement(); + + checkSameResult(s, a, "select a.c, b.b, c.a from a.a, b.b, c.c where a.a = b.a and b.c = c.c order by a.c, b.b, c.a"); + checkSameResult(s, b, "select a.a, b.c, c.b from a.a, b.b, c.c where a.b = b.b and b.a = c.a order by a.a, b.c, c.b"); + checkSameResult(s, c, "select a.b, b.a, c.c from a.a, b.b, c.c where a.c = b.c and b.b = c.b order by a.b, b.a, c.c"); + + for (int i = 0; i < 150; i++) { + checkSameResult(s, a, "select a.c, b.b, c.a from a.a, b.b, c.c where " + + i + " = a.c and a.a = b.a and b.c = c.c order by a.c, b.b, c.a"); + checkSameResult(s, b, "select a.a, b.c, c.b from a.a, b.b, c.c where " + + i + " = c.b and a.b = b.b and b.a = c.a order by a.a, b.c, c.b"); + checkSameResult(s, c, "select a.b, b.a, c.c from a.a, b.b, c.c where " + + i + " = b.c and a.c = b.c and b.b = c.b order by a.b, b.a, c.c"); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + U.closeQuiet(conn); + stopAllGrids(); + } + + /** + * + */ + public static class X { + /** */ + public long a; + + /** */ + public long b; + + /** */ + public long c; + + /** + * @param a A. + * @param b B. + * @param c C. + */ + public X(int a, int b, int c) { + this.a = a; + this.b = b; + this.c = c; + } + + /** */ + @QuerySqlField(index = true) + public long getA() { + return a; + } + + /** */ + @QuerySqlField(index = true) + public long getB() { + return b; + } + + /** */ + @QuerySqlField(index = true) + public long getC() { + return c; + } + } + + /** + * + */ + public static class A extends X { + /** + * @param a A. + * @param b B. + * @param c C. + */ + public A(int a, int b, int c) { + super(a, b, c); + } + } + + /** + * + */ + public static class B extends X { + /** + * @param a A. + * @param b B. + * @param c C. + */ + public B(int a, int b, int c) { + super(a, b, c); + } + } + + /** + * + */ + public static class C extends X { + /** + * @param a A. + * @param b B. + * @param c C. + */ + public C(int a, int b, int c) { + super(a, b, c); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java new file mode 100644 index 0000000..fa9032d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends AbstractH2CompareQueryTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ACCOUNT_CACHE = "acc"; + + /** */ + private boolean client; + + /** */ + private boolean h2DataInserted; + + /** {@inheritDoc} */ + @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void initCacheAndDbData() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void checkAllDataEquals() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration personCache() { + CacheConfiguration ccfg = configuration(PERSON_CACHE, 0); + + // Person cache is replicated. + ccfg.setCacheMode(REPLICATED); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + return ccfg; + } + + /** + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration accountCache(int backups) { + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, backups); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("personId"))); + + ccfg.setQueryEntities(F.asList(entity)); + + return ccfg; + } + + /** + * @param name Cache name. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name, int backups) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(backups); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = true; + + startGrid(SRVS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected Statement initializeH2Schema() throws SQLException { + Statement st = super.initializeH2Schema(); + + st.execute("CREATE SCHEMA \"person\""); + st.execute("CREATE SCHEMA \"acc\""); + + st.execute("create table \"person\".PERSON" + + " (_key int not null," + + " _val other not null," + + " name varchar(255))"); + + st.execute("create table \"acc\".ACCOUNT" + + " (_key int not null," + + " _val other not null," + + " personId int," + + " name varchar(255))"); + + return st; + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(SRVS); + + client.createCache(personCache()); + + checkJoin(0); + + h2DataInserted = true; + + checkJoin(1); + + checkJoin(2); + } + + /** + * @param accBackups Account cache backups. + * @throws Exception If failed. + */ + private void checkJoin(int accBackups) throws Exception { + Ignite client = grid(SRVS); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger accKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + try { + IgniteCache<Object, Object> accCache = client.createCache(accountCache(accBackups)); + + Integer pKey1 = keyForNode(aff, pKey, node0); // No accounts. + insert(personCache, pKey1, new Person("p1")); + + Integer pKey2 = keyForNode(aff, pKey, node0); // 1 collocated account. + insert(personCache, pKey2, new Person("p2")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey2, "a-p2")); + + Integer pKey3 = keyForNode(aff, pKey, node0); // 1 non-collocated account. + insert(personCache, pKey3, new Person("p3")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey3, "a-p3")); + + Integer pKey4 = keyForNode(aff, pKey, node0); // 1 collocated, 1 non-collocated account. + insert(personCache, pKey4, new Person("p4")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey4, "a-p4-1")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey4, "a-p4-2")); + + Integer pKey5 = keyForNode(aff, pKey, node0); // 2 collocated accounts. + insert(personCache, pKey5, new Person("p5")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey5, "a-p5-1")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey5, "a-p5-1")); + + Integer pKey6 = keyForNode(aff, pKey, node0); // 2 non-collocated accounts. + insert(personCache, pKey6, new Person("p6")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey6, "a-p5-1")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey6, "a-p5-1")); + + Integer[] keys = {pKey1, pKey2, pKey3, pKey4, pKey5, pKey6}; + + for (int i = 0; i < keys.length; i++) { + log.info("Test key: " + i); + + Integer key = keys[i]; + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p, \"acc\".Account a " + + "where p._key = a.personId and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"acc\".Account a, \"person\".Person p " + + "where p._key = a.personId and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p right outer join \"acc\".Account a " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"acc\".Account a left outer join \"person\".Person p " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + +// checkQuery("select p._key, p.name, a.name " + +// "from \"acc\".Account a right outer join \"person\".Person p " + +// "on (p._key = a.personId) and p._key=?", accCache, true, key); +// +// checkQuery("select p._key, p.name, a.name " + +// "from \"person\".Person p left outer join \"acc\".Account a " + +// "on (p._key = a.personId) and p._key=?", accCache, true, key); + } + } + finally { + client.destroyCache(ACCOUNT_CACHE); + + personCache.removeAll(); + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param p Person. + * @throws Exception If failed. + */ + private void insert(IgniteCache<Object, Object> cache, int key, Person p) throws Exception { + cache.put(key, p); + + if (h2DataInserted) + return; + + try(PreparedStatement st = conn.prepareStatement("insert into \"person\".PERSON " + + "(_key, _val, name) values(?, ?, ?)")) { + st.setObject(1, key); + st.setObject(2, p); + st.setObject(3, p.name); + + st.executeUpdate(); + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param a Account. + * @throws Exception If failed. + */ + private void insert(IgniteCache<Object, Object> cache, int key, Account a) throws Exception { + cache.put(key, a); + + if (h2DataInserted) + return; + + try(PreparedStatement st = conn.prepareStatement("insert into \"acc\".ACCOUNT " + + "(_key, _val, personId, name) values(?, ?, ?, ?)")) { + st.setObject(1, key); + st.setObject(2, a); + st.setObject(3, a.personId); + st.setObject(4, a.name); + + st.executeUpdate(); + } + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param args Arguments. + * @throws Exception If failed. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + Object... args) throws Exception { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setArgs(args) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + compareQueryRes0(cache, sql, true, enforceJoinOrder, args, Ordering.RANDOM); + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + int personId; + + /** */ + String name; + + /** + * @param personId Person ID. + * @param name Name. + */ + public Account(int personId, String name) { + this.personId = personId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Person(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java index 39c0cb5..1d528c9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java @@ -17,9 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; @@ -32,8 +29,15 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import javax.cache.CacheException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -53,6 +57,9 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr private static final String ORG_CACHE = "org"; /** */ + private static final String ORG_CACHE_REPLICATED = "orgRepl"; + + /** */ private boolean client; /** {@inheritDoc} */ @@ -95,6 +102,22 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr ccfgs.add(ccfg); } + { + CacheConfiguration ccfg = configuration(ORG_CACHE_REPLICATED); + + ccfg.setCacheMode(REPLICATED); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("id", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); cfg.setClientMode(client); @@ -144,10 +167,12 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + IgniteCache<Object, Object> orgCacheRepl = client.cache(ORG_CACHE_REPLICATED); List<Integer> keys = primaryKeys(ignite(0).cache(PERSON_CACHE), 3, 200_000); orgCache.put(keys.get(0), new Organization(0, "org1")); + orgCacheRepl.put(keys.get(0), new Organization(0, "org1")); personCache.put(keys.get(1), new Person(0, "p1")); personCache.put(keys.get(2), new Person(0, "p2")); @@ -160,12 +185,54 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr "on (p.orgId = o.id)", orgCache, 2); checkQuery("select o.name, p._key, p.name " + - "from \"person\".Person p left join \"org\".Organization o " + - "on (p.orgId = o.id)", orgCache, 2); + "from \"person\".Person p join \"orgRepl\".Organization o " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"orgRepl\".Organization o join \"person\".Person p " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select p.name from \"person\".Person p", ignite(0).cache(PERSON_CACHE), 2); + checkQuery("select p.name from \"person\".Person p", ignite(1).cache(PERSON_CACHE), 2); + + for (int i = 0; i < 10; i++) + checkQuery("select p.name from \"person\".Person p", personCache, 2); checkQuery("select o.name, p._key, p.name " + "from \"org\".Organization o left join \"person\".Person p " + "on (p.orgId = o.id)", orgCache, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"person\".Person p left join \"orgRepl\".Organization o " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"orgRepl\".Organization o left join \"person\".Person p " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQueryFails("select o.name, p._key, p.name " + + "from \"person\".Person p left join \"org\".Organization o " + + "on (p.orgId = o.id)", personCache); + + checkQueryFails("select o.name, p._key, p.name " + + "from \"org\".Organization o right join \"person\".Person p " + + "on (p.orgId = o.id)", personCache); + } + + /** + * @param sql SQL. + * @param cache Cache. + */ + private void checkQueryFails(final String sql, final IgniteCache<Object, Object> cache) { + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + cache.query(qry).getAll(); + + return null; + } + }, CacheException.class, null); } /** @@ -233,6 +300,7 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr int id; /** + * @param id ID. * @param name Name. */ public Organization(int id, String name) {
