http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java new file mode 100644 index 0000000..0e6806f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.io.Serializable; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.typedef.CAX; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Test for distributed queries with node restarts. + */ +public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridCommonAbstractTest { + /** */ + private static final String QRY_0 = "select co._key, count(*) cnt\n" + + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + + "group by co._key order by cnt desc, co._key"; + + /** */ + private static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" + + "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" + + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + + "group by co._key order by cnt desc, co._key"; + + /** */ + private static final String QRY_1 = "select pr._key, co._key\n" + + "from \"pr\".Product pr, \"co\".Company co\n" + + "where pr.companyId = co._key\n" + + "order by co._key, pr._key "; + + /** */ + private static final String QRY_1_BROADCAST = "select pr._key, co._key\n" + + "from \"co\".Company co, \"pr\".Product pr \n" + + "where pr.companyId = co._key\n" + + "order by co._key, pr._key "; + + /** */ + private static final int GRID_CNT = 6; + + /** */ + private static final int PERS_CNT = 600; + + /** */ + private static final int PURCHASE_CNT = 6000; + + /** */ + private static final int COMPANY_CNT = 25; + + /** */ + private static final int PRODUCT_CNT = 100; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + int i = 0; + + CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4]; + + for (String name : F.asList("pe", "pu", "co", "pr")) { + CacheConfiguration<?, ?> cc = defaultCacheConfiguration(); + + cc.setName(name); + cc.setCacheMode(PARTITIONED); + cc.setBackups(2); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(SYNC); + cc.setLongQueryWarningTimeout(15_000); + cc.setAffinity(new RendezvousAffinityFunction(false, 60)); + + switch (name) { + case "pe": + cc.setIndexedTypes( + Integer.class, Person.class + ); + + break; + + case "pu": + cc.setIndexedTypes( + Integer.class, Purchase.class + ); + + break; + + case "co": + cc.setIndexedTypes( + Integer.class, Company.class + ); + + break; + + case "pr": + cc.setIndexedTypes( + Integer.class, Product.class + ); + + break; + } + + ccs[i++] = cc; + } + + c.setCacheConfiguration(ccs); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(GRID_CNT); + + fillCaches(); + } + + /** + * + */ + private void fillCaches() { + IgniteCache<Integer, Company> co = grid(0).cache("co"); + + for (int i = 0; i < COMPANY_CNT; i++) + co.put(i, new Company(i)); + + IgniteCache<Integer, Product> pr = grid(0).cache("pr"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < PRODUCT_CNT; i++) + pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + + IgniteCache<Integer, Person> pe = grid(0).cache("pe"); + + for (int i = 0; i < PERS_CNT; i++) + pe.put(i, new Person(i)); + + IgniteCache<Integer, Purchase> pu = grid(0).cache("pu"); + + for (int i = 0; i < PURCHASE_CNT; i++) { + int persId = rnd.nextInt(PERS_CNT); + int prodId = rnd.nextInt(PRODUCT_CNT); + + pu.put(i, new Purchase(persId, prodId)); + } + } + /** + * @throws Exception If failed. + */ + public void testRestarts() throws Exception { + restarts(false); + } + + /** + * @throws Exception If failed. + */ + public void testRestartsBroadcast() throws Exception { + restarts(true); + } + + /** + * @param broadcastQry If {@code true} tests broadcast query. + * @throws Exception If failed. + */ + private void restarts(final boolean broadcastQry) throws Exception { + int duration = 90 * 1000; + int qryThreadNum = 4; + int restartThreadsNum = 2; // 4 + 2 = 6 nodes + final int nodeLifeTime = 4000; + final int logFreq = 100; + + final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT); + + SqlFieldsQuery qry0 ; + + if (broadcastQry) + qry0 = new SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true); + else + qry0 = new SqlFieldsQuery(QRY_0).setDistributedJoins(true); + + String plan = queryPlan(grid(0).cache("pu"), qry0); + + X.println("Plan1: " + plan); + + assertEquals(broadcastQry, plan.contains("batched:broadcast")); + + final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll(); + + Thread.sleep(3000); + + assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll()); + + final SqlFieldsQuery qry1; + + if (broadcastQry) + qry1 = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true); + else + qry1 = new SqlFieldsQuery(QRY_1).setDistributedJoins(true); + + plan = queryPlan(grid(0).cache("co"), qry1); + + X.println("Plan2: " + plan); + + assertEquals(broadcastQry, plan.contains("batched:broadcast")); + + final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll(); + + assertFalse(pRes.isEmpty()); + assertFalse(rRes.isEmpty()); + + final AtomicInteger qryCnt = new AtomicInteger(); + final AtomicBoolean qrysDone = new AtomicBoolean(); + + IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridRandom rnd = new GridRandom(); + + while (!qrysDone.get()) { + int g; + + do { + g = rnd.nextInt(locks.length()); + } + while (!locks.compareAndSet(g, 0, 1)); + + if (rnd.nextBoolean()) { + IgniteCache<?, ?> cache = grid(g).cache("pu"); + + SqlFieldsQuery qry; + + if (broadcastQry) + qry = new SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true); + else + qry = new SqlFieldsQuery(QRY_0).setDistributedJoins(true); + + boolean smallPageSize = rnd.nextBoolean(); + + qry.setPageSize(smallPageSize ? 30 : 1000); + + try { + assertEquals(pRes, cache.query(qry).getAll()); + } + catch (CacheException e) { + assertTrue("On large page size must retry.", smallPageSize); + + boolean failedOnRemoteFetch = false; + + for (Throwable th = e; th != null; th = th.getCause()) { + if (!(th instanceof CacheException)) + continue; + + if (th.getMessage() != null && + th.getMessage().startsWith("Failed to fetch data from node:")) { + failedOnRemoteFetch = true; + + break; + } + } + + if (!failedOnRemoteFetch) { + e.printStackTrace(); + + fail("Must fail inside of GridResultPage.fetchNextPage or subclass."); + } + } + } + else { + IgniteCache<?, ?> cache = grid(g).cache("co"); + + SqlFieldsQuery qry; + + if (broadcastQry) + qry = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true); + else + qry = new SqlFieldsQuery(QRY_1).setDistributedJoins(true); + + assertEquals(rRes, cache.query(qry1).getAll()); + } + + locks.set(g, 0); + + int c = qryCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Executed queries: " + c); + } + } + }, qryThreadNum, "query-thread"); + + final AtomicInteger restartCnt = new AtomicInteger(); + + final AtomicBoolean restartsDone = new AtomicBoolean(); + + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @SuppressWarnings({"BusyWait"}) + @Override public Object call() throws Exception { + GridRandom rnd = new GridRandom(); + + while (!restartsDone.get()) { + int g; + + do { + g = rnd.nextInt(locks.length()); + } + while (!locks.compareAndSet(g, 0, -1)); + + log.info("Stop node: " + g); + + stopGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + log.info("Start node: " + g); + + startGrid(g); + + Thread.sleep(rnd.nextInt(nodeLifeTime)); + + locks.set(g, 0); + + int c = restartCnt.incrementAndGet(); + + if (c % logFreq == 0) + info("Node restarts: " + c); + } + + return true; + } + }, restartThreadsNum, "restart-thread"); + + Thread.sleep(duration); + + info("Stopping.."); + + restartsDone.set(true); + qrysDone.set(true); + + fut2.get(); + fut1.get(); + + info("Stopped."); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** + * @param id ID. + */ + Person(int id) { + this.id = id; + } + } + + /** + * + */ + private static class Purchase implements Serializable { + /** */ + @QuerySqlField(index = true) + int personId; + + /** */ + @QuerySqlField(index = true) + int productId; + + /** + * @param personId Person ID. + * @param productId Product ID. + */ + Purchase(int personId, int productId) { + this.personId = personId; + this.productId = productId; + } + } + + /** + * + */ + private static class Company implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** + * @param id ID. + */ + Company(int id) { + this.id = id; + } + } + + /** + * + */ + private static class Product implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** */ + @QuerySqlField(index = true) + int companyId; + + /** + * @param id ID. + * @param companyId Company ID. + */ + Product(int id, int companyId) { + this.id = id; + this.companyId = companyId; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java deleted file mode 100644 index 82456fb..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.reducefields; - -import java.io.Serializable; -import java.util.Collection; -import java.util.List; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.AffinityKey; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteReducer; -import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * Tests for reduce fields queries. - */ -public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Flag indicating if starting node should have cache. */ - protected boolean hasCache; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (hasCache) - cfg.setCacheConfiguration(cache(null)); - else - cfg.setCacheConfiguration(); - - cfg.setDiscoverySpi(discovery()); - - return cfg; - } - - /** - * @return Distribution. - */ - protected NearCacheConfiguration nearConfiguration() { - return new NearCacheConfiguration(); - } - - /** - * @param name Cache name. - * @return Cache. - */ - private CacheConfiguration cache(@Nullable String name) { - CacheConfiguration<?,?> cache = defaultCacheConfiguration(); - - cache.setName(name); - cache.setCacheMode(cacheMode()); - cache.setAtomicityMode(atomicityMode()); - cache.setNearConfiguration(nearConfiguration()); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setRebalanceMode(SYNC); - cache.setIndexedTypes( - String.class, Organization.class, - AffinityKey.class, Person.class - ); - - if (cacheMode() == PARTITIONED) - cache.setBackups(1); - - return cache; - } - - /** - * @return Discovery SPI. - */ - private static DiscoverySpi discovery() { - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - return spi; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - hasCache = true; - - startGridsMultiThreaded(gridCount()); - - hasCache = false; - - startGrid(gridCount()); - - IgniteCache<String, Organization> orgCache = grid(0).cache(null); - - assert orgCache != null; - - orgCache.put("o1", new Organization(1, "A")); - orgCache.put("o2", new Organization(2, "B")); - - IgniteCache<AffinityKey<String>, Person> personCache = grid(0).cache(null); - - assert personCache != null; - - personCache.put(new AffinityKey<>("p1", "o1"), new Person("John White", 25, 1)); - personCache.put(new AffinityKey<>("p2", "o1"), new Person("Joe Black", 35, 1)); - personCache.put(new AffinityKey<>("p3", "o2"), new Person("Mike Green", 40, 2)); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * @return cache mode. - */ - protected abstract CacheMode cacheMode(); - - /** - * @return Number of grids to start. - */ - protected abstract int gridCount(); - - /** - * @return Cache atomicity mode. - */ - protected CacheAtomicityMode atomicityMode() { - return TRANSACTIONAL; - } - - /** - * @throws Exception If failed. - */ - public void testNoDataInCache() throws Exception { - CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)) - .getCache(null).context().queries().createSqlFieldsQuery("select age from Person where orgId = 999", false); - - Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); - - assertEquals("Result", 0, F.reduce(res, new AverageLocalReducer()).intValue()); - } - - /** - * @throws Exception If failed. - */ - public void testAverageQuery() throws Exception { - CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).context().queries(). - createSqlFieldsQuery("select age from Person", false); - - Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get(); - - assertEquals("Average", 33, F.reduce(res, new AverageLocalReducer()).intValue()); - } - - /** - * @throws Exception If failed. - */ - public void testAverageQueryWithArguments() throws Exception { - CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).context().queries().createSqlFieldsQuery( - "select age from Person where orgId = ?", false); - - Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer(), 1).get(); - - assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue()); - } - -// /** -// * @throws Exception If failed. -// */ -// public void testFilters() throws Exception { -// GridCacheReduceFieldsQuery<Object, Object, GridBiTuple<Integer, Integer>, Integer> qry = ((IgniteKernal)grid(0)).cache(null) -// .queries().createReduceFieldsQuery("select age from Person"); -// -// qry = qry.remoteKeyFilter( -// new GridPredicate<Object>() { -// @Override public boolean apply(Object e) { -// return !"p2".equals(((AffinityKey)e).key()); -// } -// } -// ).remoteValueFilter( -// new P1<Object>() { -// @Override public boolean apply(Object e) { -// return !"Mike Green".equals(((Person)e).name); -// } -// } -// ); -// -// qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); -// -// Integer avg = qry.reduce().get(); -// -// assertNotNull("Average", avg); -// assertEquals("Average", 25, avg.intValue()); -// } - -// /** -// * @throws Exception If failed. -// */ -// public void testOnProjectionWithFilter() throws Exception { -// P2<AffinityKey<String>, Person> p = new P2<AffinityKey<String>, Person>() { -// @Override public boolean apply(AffinityKey<String> key, Person val) { -// return val.orgId == 1; -// } -// }; -// -// InternalCache<AffinityKey<String>, Person> cachePrj = -// grid(0).<AffinityKey<String>, Person>cache(null).projection(p); -// -// GridCacheReduceFieldsQuery<AffinityKey<String>, Person, GridBiTuple<Integer, Integer>, Integer> qry = -// cachePrj.queries().createReduceFieldsQuery("select age from Person"); -// -// qry = qry.remoteValueFilter( -// new P1<Person>() { -// @Override public boolean apply(Person e) { -// return !"Joe Black".equals(e.name); -// } -// }); -// -// qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer()); -// -// Integer avg = qry.reduce().get(); -// -// assertNotNull("Average", avg); -// assertEquals("Average", 25, avg.intValue()); -// } - - /** - * @return true if cache mode is replicated, false otherwise. - */ - private boolean isReplicatedMode() { - return cacheMode() == REPLICATED; - } - - /** - * Person. - */ - @SuppressWarnings("UnusedDeclaration") - private static class Person implements Serializable { - /** Name. */ - @QuerySqlField(index = false) - private final String name; - - /** Age. */ - @QuerySqlField(index = true) - private final int age; - - /** Organization ID. */ - @QuerySqlField(index = true) - private final int orgId; - - /** - * @param name Name. - * @param age Age. - * @param orgId Organization ID. - */ - private Person(String name, int age, int orgId) { - assert !F.isEmpty(name); - assert age > 0; - assert orgId > 0; - - this.name = name; - this.age = age; - this.orgId = orgId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - Person person = (Person)o; - - return age == person.age && orgId == person.orgId && name.equals(person.name); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = name.hashCode(); - - res = 31 * res + age; - res = 31 * res + orgId; - - return res; - } - } - - /** - * Organization. - */ - @SuppressWarnings("UnusedDeclaration") - private static class Organization implements Serializable { - /** ID. */ - @QuerySqlField - private final int id; - - /** Name. */ - @QuerySqlField(index = false) - private final String name; - - /** - * @param id ID. - * @param name Name. - */ - private Organization(int id, String name) { - assert id > 0; - assert !F.isEmpty(name); - - this.id = id; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - Organization that = (Organization)o; - - return id == that.id && name.equals(that.name); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = id; - - res = 31 * res + name.hashCode(); - - return res; - } - } - - /** - * Average remote reducer factory. - */ - protected static class AverageRemoteReducer implements IgniteReducer<List<?>, IgniteBiTuple<Integer, Integer>> { - /** */ - private int sum; - - /** */ - private int cnt; - - /** {@inheritDoc} */ - @Override public boolean collect(List<?> e) { - sum += (Integer)e.get(0); - - cnt++; - - return true; - } - - /** {@inheritDoc} */ - @Override public IgniteBiTuple<Integer, Integer> reduce() { - return F.t(sum, cnt); - } - } - - /** - * Average local reducer factory. - */ - protected static class AverageLocalReducer implements IgniteReducer<IgniteBiTuple<Integer, Integer>, Integer> { - /** */ - private int sum; - - /** */ - private int cnt; - - /** {@inheritDoc} */ - @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) { - sum += t.get1(); - cnt += t.get2(); - - return true; - } - - /** {@inheritDoc} */ - @Override public Integer reduce() { - return cnt == 0 ? 0 : sum / cnt; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java deleted file mode 100644 index 6aa467b..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.reducefields; - -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.NearCacheConfiguration; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; - -/** - * Reduce fields queries tests for partitioned cache. - */ -public class GridCacheReduceFieldsQueryAtomicSelfTest extends GridCacheReduceFieldsQueryPartitionedSelfTest { - /** {@inheritDoc} */ - @Override protected CacheAtomicityMode atomicityMode() { - return ATOMIC; - } - - /** {@inheritDoc} */ - @Override protected NearCacheConfiguration nearConfiguration() { - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java deleted file mode 100644 index 17f024e..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryLocalSelfTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.reducefields; - -import org.apache.ignite.cache.CacheMode; - -import static org.apache.ignite.cache.CacheMode.LOCAL; - -/** - * Reduce fields queries tests for local cache. - */ -public class GridCacheReduceFieldsQueryLocalSelfTest extends GridCacheAbstractReduceFieldsQuerySelfTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return LOCAL; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java deleted file mode 100644 index f5b16f5..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryPartitionedSelfTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.reducefields; - -import java.util.List; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.lang.IgniteBiTuple; - -import static org.apache.ignite.cache.CacheMode.PARTITIONED; - -/** - * Reduce fields queries tests for partitioned cache. - */ -public class GridCacheReduceFieldsQueryPartitionedSelfTest extends GridCacheAbstractReduceFieldsQuerySelfTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return PARTITIONED; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * @throws Exception If failed. - */ - public void testIncludeBackups() throws Exception { - CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).getCache(null).context().queries(). - createSqlFieldsQuery("select age from Person", false); - - qry.includeBackups(true); - - int sum = 0; - - for (IgniteBiTuple<Integer, Integer> tuple : qry.execute(new AverageRemoteReducer()).get()) - sum += tuple.get1(); - - // One backup, so sum is two times greater - assertEquals("Sum", 200, sum); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java deleted file mode 100644 index 3e08d6d..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryReplicatedSelfTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.reducefields; - -import org.apache.ignite.cache.CacheMode; - -import static org.apache.ignite.cache.CacheMode.REPLICATED; - -/** - * Reduce fields queries tests for replicated cache. - */ -public class GridCacheReduceFieldsQueryReplicatedSelfTest extends GridCacheAbstractReduceFieldsQuerySelfTest { - /** {@inheritDoc} */ - @Override protected CacheMode cacheMode() { - return REPLICATED; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java index 5c9acb5..88f1f1e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSchemaIndexingTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query; import java.util.List; import java.util.concurrent.Callable; +import javax.cache.CacheException; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -102,7 +103,7 @@ public class IgniteSqlSchemaIndexingTest extends GridCommonAbstractTest { return null; } - }, IgniteException.class, "Schema for cache already registered"); + }, IgniteException.class, "Cache already registered: "); } /** @@ -185,7 +186,7 @@ public class IgniteSqlSchemaIndexingTest extends GridCommonAbstractTest { cache.query(qryWrong); return null; } - }, IgniteException.class, "Failed to parse query"); + }, CacheException.class, "Failed to parse query"); SqlFieldsQuery qryCorrect = new SqlFieldsQuery("select f.\"id\", f.\"name\" " + "from \""+schemaName+"\".\"Fact\" f"); http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index fd52469..8ab70ba 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -24,10 +24,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import javax.cache.CacheException; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheKeyConfiguration; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; @@ -38,7 +43,9 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.springframework.util.StringUtils; /** * Tests for correct distributed partitioned queries. @@ -52,6 +59,10 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(TestKey.class.getName(), "affKey"); + + cfg.setCacheKeyConfiguration(keyCfg); + cfg.setPeerClassLoadingEnabled(false); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -97,6 +108,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { Integer.class, Integer.class)); try { + awaitPartitionMapExchange(); + List<Integer> res = new ArrayList<>(); Random rnd = new GridRandom(); @@ -136,6 +149,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { Integer.class, GroupIndexTestValue.class)); try { + awaitPartitionMapExchange(); + // Check group index usage. String qry = "select 1 from GroupIndexTestValue "; @@ -207,10 +222,659 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDistributedJoins() throws Exception { + CacheConfiguration ccfg = cacheConfig("persOrg", true, + Integer.class, Person2.class, Integer.class, Organization.class); + + IgniteCache<Integer, Object> c = ignite(0).getOrCreateCache(ccfg); + + try { + awaitPartitionMapExchange(); + + doTestDistributedJoins(c, 30, 100, 1000, false); + doTestDistributedJoins(c, 30, 100, 1000, true); + + doTestDistributedJoins(c, 3, 10, 3, false); + doTestDistributedJoins(c, 3, 10, 3, true); + + doTestDistributedJoins(c, 300, 2000, 5, false); + doTestDistributedJoins(c, 300, 2000, 5, true); + } + finally { + c.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDistributedJoinsUnion() throws Exception { + CacheConfiguration ccfg = cacheConfig("persOrg", true, + Integer.class, Person2.class, Integer.class, Organization.class); + + IgniteCache<Integer, Object> c = ignite(0).getOrCreateCache(ccfg); + + try { + c.put(1, new Organization("o1")); + c.put(2, new Organization("o2")); + c.put(3, new Person2(1, "p1")); + c.put(4, new Person2(2, "p2")); + c.put(5, new Person2(3, "p3")); + + String select = "select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=1" + + " union select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=2"; + + String plan = (String)c.query(new SqlFieldsQuery("explain " + select) + .setDistributedJoins(true).setEnforceJoinOrder(true)) + .getAll().get(0).get(0); + + X.println("Plan : " + plan); + + assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched:unicast")); + + assertEquals(2, c.query(new SqlFieldsQuery(select).setDistributedJoins(true) + .setEnforceJoinOrder(false)).getAll().size()); + + select = "select * from (" + select + ")"; + + plan = (String)c.query(new SqlFieldsQuery("explain " + select) + .setDistributedJoins(true).setEnforceJoinOrder(true)) + .getAll().get(0).get(0); + + X.println("Plan : " + plan); + + assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched:unicast")); + + assertEquals(2, c.query(new SqlFieldsQuery(select).setDistributedJoins(true) + .setEnforceJoinOrder(false)).getAll().size()); + } + finally { + c.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDistributedJoinsUnionPartitionedReplicated() throws Exception { + CacheConfiguration ccfg1 = cacheConfig("pers", true, + Integer.class, Person2.class); + CacheConfiguration ccfg2 = cacheConfig("org", false, + Integer.class, Organization.class); + + IgniteCache<Integer, Object> c1 = ignite(0).getOrCreateCache(ccfg1); + IgniteCache<Integer, Object> c2 = ignite(0).getOrCreateCache(ccfg2); + + try { + c2.put(1, new Organization("o1")); + c2.put(2, new Organization("o2")); + c1.put(3, new Person2(1, "p1")); + c1.put(4, new Person2(2, "p2")); + c1.put(5, new Person2(3, "p3")); + + String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" + + " union select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2"; + + String plan = (String)c1.query(new SqlFieldsQuery("explain " + select0) + .setDistributedJoins(true)) + .getAll().get(0).get(0); + + X.println("Plan: " + plan); + + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll().size()); + + String select = "select * from (" + select0 + ")"; + + plan = (String)c1.query(new SqlFieldsQuery("explain " + select) + .setDistributedJoins(true)) + .getAll().get(0).get(0); + + X.println("Plan : " + plan); + + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); + + String select1 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" + + " union select * from (select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2)"; + + plan = (String)c1.query(new SqlFieldsQuery("explain " + select1) + .setDistributedJoins(true)).getAll().get(0).get(0); + + X.println("Plan: " + plan); + + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); + + select = "select * from (" + select1 + ")"; + + plan = (String)c1.query(new SqlFieldsQuery("explain " + select) + .setDistributedJoins(true)).getAll().get(0).get(0); + + X.println("Plan : " + plan); + + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testDistributedJoinsPlan() throws Exception { + List<IgniteCache<Object, Object>> caches = new ArrayList<>(); + + IgniteCache<Object, Object> persPart = + ignite(0).createCache(cacheConfig("persPart", true, Integer.class, Person2.class)); + caches.add(persPart); + + IgniteCache<Object, Object> persPartAff = + ignite(0).createCache(cacheConfig("persPartAff", true, TestKey.class, Person2.class)); + caches.add(persPartAff); + + IgniteCache<Object, Object> orgPart = + ignite(0).createCache(cacheConfig("orgPart", true, Integer.class, Organization.class)); + caches.add(orgPart); + + IgniteCache<Object, Object> orgPartAff = + ignite(0).createCache(cacheConfig("orgPartAff", true, TestKey.class, Organization.class)); + caches.add(orgPartAff); + + IgniteCache<Object, Object> orgRepl = + ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, Organization.class)); + caches.add(orgRepl); + + IgniteCache<Object, Object> orgRepl2 = + ignite(0).createCache(cacheConfig("orgRepl2", false, Integer.class, Organization.class)); + caches.add(orgRepl2); + + try { + // Join two partitioned. + + checkQueryPlan(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPart\".Organization o " + + "where p.orgId = o._key", + "batched:unicast"); + + checkQueryPlan(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPartAff\".Organization o " + + "where p.orgId = o.affKey", + "batched:unicast"); + + checkQueryPlan(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgPart\".Organization o " + + "where p.orgId = o._key", + "batched:unicast"); + + checkQueryPlan(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p inner join \"orgPart\".Organization o " + + "on p.orgId = o._key", + "batched:unicast"); + + checkQueryPlan(persPart, + false, + 1, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p left outer join \"orgPart\".Organization o " + + "on p.orgId = o._key", + "batched:unicast"); + + checkQueryPlan(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgPart\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o._key", + "batched:broadcast"); + + checkQueryPlan(persPart, + true, + 1, + "select p._key k1, o._key k2 " + + "from \"orgPartAff\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o.affKey", + "batched:broadcast"); + + // Join partitioned and replicated. + + checkQueryPlan(persPart, + true, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgRepl\".Organization o " + + "where p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, \"orgRepl\".Organization o " + + "where p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, (select * from \"orgRepl\".Organization) o " + + "where p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from (select * from \"orgRepl\".Organization) o, \"persPart\".Person2 p " + + "where p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p inner join \"orgRepl\".Organization o " + + "on p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p left outer join \"orgRepl\".Organization o " + + "on p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"orgRepl\".Organization o, \"persPart\".Person2 p " + + "where p.orgId = o._key"); + + checkQueryPlan(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from \"orgRepl\".Organization o inner join \"persPart\".Person2 p " + + "on p.orgId = o._key"); + +// checkQueryPlan(persPart, +// true, +// 1, +// "select p._key k1, o._key k2 " + +// "from \"orgRepl\".Organization o left outer join \"persPart\".Person2 p " + +// "on p.orgId = o._key", +// "batched:broadcast"); + + // Join on affinity keys. + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "\"persPart\".Person2 p", + "\"orgPart\".Organization o", + "where p._key = o._key", true); + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "\"persPart\".Person2 p", + "\"orgRepl\".Organization o", + "where p._key = o._key", true); + + checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ", + "\"persPartAff\".Person2 p", + "\"orgPart\".Organization o", + "where p.affKey = o._key", true); + + checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ", + "\"persPartAff\".Person2 p", + "\"orgRepl\".Organization o", + "where p.affKey = o._key", true); + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "(select * from \"persPart\".Person2) p", + "\"orgPart\".Organization o", + "where p._key = o._key", false); + + checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ", + "\"persPart\".Person2 p", + "(select * from \"orgPart\".Organization) o", + "where p._key = o._key", false); + + // Join multiple. + + { + String sql = "select * from " + + "(select o1._key k1, o2._key k2 from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > o2._key) o, " + + "\"persPart\".Person2 p where p.orgId = o.k1"; + + checkQueryPlan(persPart, + false, + 0, + sql); + + checkQueryPlan(persPart, + true, + 0, + sql); + + sql = "select o.k1, p1._key k2, p2._key k3 from " + + "(select o1._key k1, o2._key k2 from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > o2._key) o, " + + "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 where p1._key=p2._key and p2.orgId = o.k1"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "persPartAff", "persPart", "batched:unicast", "orgRepl"); + + checkQueryFails(persPart, sql, true); + + sql = "select o.k1, p._key k2 from " + + "(select o1._key k1, p1._key k2 from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 where o1._key = p1.orgId) o, " + + "\"persPartAff\".Person2 p where p._key=o.k1"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "FROM \"persPart\"", "INNER JOIN \"orgRepl\"", "INNER JOIN \"persPartAff\"", "batched:broadcast"); + + checkQueryFails(persPart, sql, true); + } + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " + + "where p1.affKey=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + true, + 2, + sql, + "batched:unicast", "batched:unicast"); + + checkQueryPlan(persPart, + false, + 2, + sql, + "batched:unicast", "batched:unicast"); + } + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " + + "where p1.affKey > p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + true, + 2, + sql, + "batched:broadcast", "batched:unicast"); + + checkQueryPlan(persPart, + false, + 2, + sql, + "batched:broadcast", "batched:unicast"); + } + + { + // First join is collocated, second is replicated. + + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + + "where p1.affKey=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + true, + 0, + sql); + + checkQueryPlan(persPart, + false, + 0, + sql); + } + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from (select * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlan(persPart, + false, + 1, + sql, + "batched:unicast"); + } + } + finally { + for (IgniteCache<Object, Object> cache : caches) + ignite(0).destroyCache(cache.getName()); + } + } + /** + * @throws Exception If failed. + */ + public void testDistributedJoinsEnforceReplicatedNotLast() throws Exception { + List<IgniteCache<Object, Object>> caches = new ArrayList<>(); + + IgniteCache<Object, Object> persPart = + ignite(0).createCache(cacheConfig("persPart", true, Integer.class, Person2.class)); + caches.add(persPart); + + IgniteCache<Object, Object> persPartAff = + ignite(0).createCache(cacheConfig("persPartAff", true, TestKey.class, Person2.class)); + caches.add(persPartAff); + + IgniteCache<Object, Object> orgRepl = + ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, Organization.class)); + caches.add(orgRepl); + + try { + checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " + + "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key", true); + + checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key", true); + + checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, (select * from \"orgRepl\".Organization) o, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key", true); + + checkQueryPlan(persPart, + true, + 0, + "select p._key k1, o._key k2 from \"orgRepl\".Organization o, \"persPart\".Person2 p"); + + checkQueryPlan(persPart, + true, + 0, + "select p._key k1, o._key k2 from \"orgRepl\".Organization o, \"persPart\".Person2 p union " + + "select p._key k1, o._key k2 from \"persPart\".Person2 p, \"orgRepl\".Organization o"); + } + finally { + for (IgniteCache<Object, Object> cache : caches) + ignite(0).destroyCache(cache.getName()); + } + } + + /** + * @param cache Cache. + * @param sql SQL. + * @param enforceJoinOrder Enforce join order flag. + */ + private void checkQueryFails(final IgniteCache<Object, Object> cache, + String sql, + boolean enforceJoinOrder) { + final SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.query(qry); + + return null; + } + }, CacheException.class, null); + } + + /** + * @param cache Query cache. + * @param select Select clause. + * @param cache1 Cache name1. + * @param cache2 Cache name2. + * @param where Where clause. + * @param testEnforceJoinOrder If {@code true} tests query with enforced join order. + */ + private void checkNoBatchedJoin(IgniteCache<Object, Object> cache, + String select, + String cache1, + String cache2, + String where, + boolean testEnforceJoinOrder) { + checkQueryPlan(cache, + false, + 0, + select + + "from " + cache1 + "," + cache2 + " "+ where); + + checkQueryPlan(cache, + false, + 0, + select + + "from " + cache2 + "," + cache1 + " "+ where); + + if (testEnforceJoinOrder) { + checkQueryPlan(cache, + true, + 0, + select + + "from " + cache1 + "," + cache2 + " "+ where); + + checkQueryPlan(cache, + true, + 0, + select + + "from " + cache2 + "," + cache1 + " "+ where); + } + } + + /** + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expBatchedJoins Expected batched joins count. + * @param sql Query. + * @param expText Expected text to find in plan. + */ + private void checkQueryPlan(IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expBatchedJoins, + String sql, + String...expText) { + checkQueryPlan(cache, + enforceJoinOrder, + expBatchedJoins, + new SqlFieldsQuery(sql), + expText); + + checkQueryPlan(cache, + enforceJoinOrder, + expBatchedJoins, + new SqlFieldsQuery("select * from (" + sql + ")"), + expText); + } + + /** + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expBatchedJoins Expected batched joins count. + * @param qry Query. + * @param expText Expected text to find in plan. + */ + private void checkQueryPlan(IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expBatchedJoins, + SqlFieldsQuery qry, + String...expText) { + qry.setEnforceJoinOrder(enforceJoinOrder); + qry.setDistributedJoins(true); + + String plan = queryPlan(cache, qry); + + log.info("Plan: " + plan); + + assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']', + expBatchedJoins, + StringUtils.countOccurrencesOf(plan, "batched")); + + int startIdx = 0; + + for (String exp : expText) { + int idx = plan.indexOf(exp, startIdx); + + if (idx == -1) { + fail("Plan does not contain expected string [startIdx=" + startIdx + + ", plan=" + plan + + ", exp=" + exp + ']'); + } + + startIdx = idx + 1; + } + } + + /** * Test HAVING clause. */ public void testHaving() { - IgniteCache<Integer, Integer> c = ignite(0).getOrCreateCache(cacheConfig("ints", true, + IgniteCache<Integer, Integer> c = ignite(0).getOrCreateCache(cacheConfig("having", true, Integer.class, Integer.class)); try { @@ -256,6 +920,61 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { /** * @param c Cache. + * @param orgs Number of organizations. + * @param persons Number of persons. + * @param pageSize Page size. + * @param enforceJoinOrder Enforce join order. + */ + private void doTestDistributedJoins(IgniteCache<Integer, Object> c, int orgs, int persons, int pageSize, + boolean enforceJoinOrder) { + assertEquals(0, c.size(CachePeekMode.ALL)); + + int key = 0; + + for (int i = 0; i < orgs; i++) { + Organization o = new Organization(); + + o.name = "Org" + i; + + c.put(key++, o); + } + + Random rnd = new GridRandom(); + + for (int i = 0; i < persons; i++) { + Person2 p = new Person2(); + + p.name = "Person" + i; + p.orgId = rnd.nextInt(orgs); + + c.put(key++, p); + } + + String select = "select count(*) from Organization o, Person2 p where p.orgId = o._key"; + + String plan = (String)c.query(new SqlFieldsQuery("explain " + select) + .setDistributedJoins(true).setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)) + .getAll().get(0).get(0); + + X.println("Plan : " + plan); + + if (enforceJoinOrder) + assertTrue(plan, plan.contains("batched:broadcast")); + else + assertTrue(plan, plan.contains("batched:unicast")); + + assertEquals(Long.valueOf(persons), c.query(new SqlFieldsQuery(select).setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0)); + + c.clear(); + + assertEquals(0, c.size(CachePeekMode.ALL)); + assertEquals(0L, c.query(new SqlFieldsQuery(select).setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0)); + } + + /** + * @param c Cache. * @param qry Query. * @param args Arguments. * @return Column as list. @@ -282,8 +1001,6 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { * */ public void testFunctionNpe() { - assert false : "https://issues.apache.org/jira/browse/IGNITE-1886"; - IgniteCache<Integer, User> userCache = ignite(0).createCache( cacheConfig("UserCache", true, Integer.class, User.class)); IgniteCache<Integer, UserOrder> userOrderCache = ignite(0).createCache( @@ -391,9 +1108,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { * Test value. */ private static class GroupIndexTestValue implements Serializable { + /** */ @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 0)) private int a; + /** */ @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 1)) private int b; @@ -407,23 +1126,131 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { } } + /** + * + */ + private static class Person2 implements Serializable { + /** */ + @QuerySqlField(index = true) + int orgId; + + /** */ + @QuerySqlField + String name; + + /** + * + */ + public Person2() { + // No-op. + } + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person2(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** + * + */ + private static class TestKey implements Serializable { + /** */ + @QuerySqlField(index = true) + @AffinityKeyMapped + int affKey; + + /** */ + @QuerySqlField() + int id; + + /** + * @param affKey Affinity key. + * @param id ID. + */ + public TestKey(int affKey, int id) { + this.affKey = affKey; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey personKey = (TestKey)o; + + return id == personKey.id; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + @QuerySqlField + String name; + + /** + * + */ + public Organization() { + // No-op. + } + + /** + * @param name Organization name. + */ + public Organization(String name) { + this.name = name; + } + } + + /** + * + */ private static class User implements Serializable { + /** */ @QuerySqlField private int id; } + /** + * + */ private static class UserOrder implements Serializable { + /** */ @QuerySqlField private int id; + /** */ @QuerySqlField private int userId; } + /** + * + */ private static class OrderGood implements Serializable { + /** */ @QuerySqlField private int orderId; + /** */ @QuerySqlField private int goodId; }
