Repository: ignite Updated Branches: refs/heads/ignite-1232 45ce0fd7d -> b7826b28a
ignite-1232 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7826b28 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7826b28 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7826b28 Branch: refs/heads/ignite-1232 Commit: b7826b28af1a9cae05a8a46d82df4b74ef251b3b Parents: 45ce0fd Author: sboikov <[email protected]> Authored: Thu Jul 7 14:25:23 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 7 16:56:55 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 7 + .../processors/query/GridQueryProcessor.java | 1 + .../processors/query/h2/IgniteH2Indexing.java | 10 +- .../query/h2/opt/GridH2QueryContext.java | 24 +- .../processors/query/h2/opt/GridH2Table.java | 4 +- .../query/h2/opt/GridH2TreeIndex.java | 26 +- ...ributedJoinPartitionedAndReplicatedTest.java | 36 +- .../IgniteCacheDistributedJoinQueryTest.java | 43 ++ ...PartitionedAndReplicatedCollocationTest.java | 397 +++++++++++++++++++ ...teCacheJoinPartitionedAndReplicatedTest.java | 42 ++ .../h2/sql/AbstractH2CompareQueryTest.java | 32 +- 11 files changed, 593 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 36d9104..bd38355 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -567,6 +567,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if cache is partitioned cache. + */ + public boolean isPartitioned() { + return cacheCfg.getCacheMode() == CacheMode.PARTITIONED; + } + + /** * @return {@code True} in case replication is enabled. */ public boolean isDrEnabled() { http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index a58741a..05fd052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -768,6 +768,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. * @param qry Query. + * @param keepBinary Keep binary flag. * @return Cursor. */ public <K, V> Iterator<Cache.Entry<K, V>> queryLocal( http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index edc35cb..ab8137a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1072,14 +1072,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * @param cctx Cache context. - * @return {@code true} If the given cache is partitioned. - */ - public static boolean isPartitioned(GridCacheContext<?,?> cctx) { - return !cctx.isReplicated() && !cctx.isLocal(); - } - - /** * @param c Connection. * @return Session. */ @@ -1095,7 +1087,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { Connection c = connectionForSpace(space); final boolean enforceJoinOrder = qry.isEnforceJoinOrder(); - final boolean distributedJoins = qry.isDistributedJoins() && isPartitioned(cctx); + final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned(); final boolean grpByCollocated = qry.isCollocated(); GridCacheTwoStepQuery twoStepQry; http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java index 223dad6..ef532dd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java @@ -85,7 +85,7 @@ public class GridH2QueryContext { private int pageSize; /** */ - private GridH2CollocationModel qryCollocationModel; + private GridH2CollocationModel qryCollocationMdl; /** * @param locNodeId Local node ID. @@ -122,14 +122,14 @@ public class GridH2QueryContext { * @return Query collocation model. */ public GridH2CollocationModel queryCollocationModel() { - return qryCollocationModel; + return qryCollocationMdl; } /** - * @param qryCollocationModel Query collocation model. + * @param qryCollocationMdl Query collocation model. */ - public void queryCollocationModel(GridH2CollocationModel qryCollocationModel) { - this.qryCollocationModel = qryCollocationModel; + public void queryCollocationModel(GridH2CollocationModel qryCollocationMdl) { + this.qryCollocationMdl = qryCollocationMdl; } /** @@ -557,13 +557,13 @@ public class GridH2QueryContext { /** {@inheritDoc} */ @Override public int hashCode() { - int result = locNodeId.hashCode(); + int res = locNodeId.hashCode(); - result = 31 * result + nodeId.hashCode(); - result = 31 * result + (int)(qryId ^ (qryId >>> 32)); - result = 31 * result + type.hashCode(); + res = 31 * res + nodeId.hashCode(); + res = 31 * res + (int)(qryId ^ (qryId >>> 32)); + res = 31 * res + type.hashCode(); - return result; + return res; } /** {@inheritDoc} */ @@ -593,9 +593,9 @@ public class GridH2QueryContext { /** {@inheritDoc} */ @Override public boolean equals(Object o) { - SourceKey sourceKey = (SourceKey)o; + SourceKey srcKey = (SourceKey)o; - return batchLookupId == sourceKey.batchLookupId && ownerId.equals(sourceKey.ownerId); + return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 8fd2880..012fb0b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -156,7 +156,7 @@ public class GridH2Table extends TableBase { * @return {@code true} If this is a partitioned table. */ public boolean isPartitioned() { - return desc != null && IgniteH2Indexing.isPartitioned(desc.context()); + return desc != null && desc.context().isPartitioned(); } /** @@ -909,7 +909,7 @@ public class GridH2Table extends TableBase { /** {@inheritDoc} */ @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) { - return find(filter.getSession(), first, last); + return delegate.find(filter, first, last); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 0b85284..7b7e0fa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -478,6 +478,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS } /** {@inheritDoc} */ + @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) { + return new GridH2Cursor(doFind(first, true, last)); + } + + /** {@inheritDoc} */ @Override public Cursor find(Session ses, @Nullable SearchRow first, @Nullable SearchRow last) { return new GridH2Cursor(doFind(first, true, last)); } @@ -507,11 +512,28 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS * @param last Upper bound always inclusive. * @return Iterator over rows in given range. */ + private Iterator<GridH2Row> doFind(@Nullable SearchRow first, + boolean includeFirst, + @Nullable SearchRow last) { + return doFind(first, includeFirst, last, true); + } + + /** + * Returns sub-tree bounded by given values. + * + * @param first Lower bound. + * @param includeFirst Whether lower bound should be inclusive. + * @param last Upper bound always inclusive. + * @return Iterator over rows in given range. + */ @SuppressWarnings("unchecked") - private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) { + private Iterator<GridH2Row> doFind(@Nullable SearchRow first, + boolean includeFirst, + @Nullable SearchRow last, + boolean filter) { ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(); - return doFind0(t, first, includeFirst, last, threadLocalFilter()); + return doFind0(t, first, includeFirst, last, filter ? threadLocalFilter() : null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java index 56a2f11..4c71a2e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java @@ -163,12 +163,12 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid public void testJoin() throws Exception { Ignite client = grid(2); - Affinity<Object> aff = client.affinity(PERSON_CACHE); - IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE); + Affinity<Object> aff = client.affinity(PERSON_CACHE); + AtomicInteger pKey = new AtomicInteger(100_000); AtomicInteger orgKey = new AtomicInteger(); AtomicInteger accKey = new AtomicInteger(); @@ -190,6 +190,10 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1, "a0")); accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1, "a1")); +// checkQuery("select p._key, p.name, a.name " + +// "from \"person\".Person p, \"acc\".Account a " + +// "where p._key = a.personId", orgCache, true, 2); + checkQuery("select o.name, p._key, p.name, a.name " + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); @@ -201,6 +205,34 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid checkQuery("select o.name, p._key, p.name, a.name " + "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " + "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + + String[] cacheNames = {"\"org\".Organization o", "\"person\".Person p", "\"acc\".Account a"}; + + for (int c1 = 0; c1 < cacheNames.length; c1++) { + for (int c2 = 0; c2 < cacheNames.length; c2++) { + if (c2 == c1) + continue; + + for (int c3 = 0; c3 < cacheNames.length; c3++) { + if (c3 == c1 || c3 == c2) + continue; + + String cache1 = cacheNames[c1]; + String cache2 = cacheNames[c2]; + String cache3 = cacheNames[c3]; + + StringBuilder qry = new StringBuilder("select o.name, p._key, p.name, a.name from "). + append(cache1).append(", "). + append(cache2).append(", "). + append(cache3).append(" "). + append("where p.orgId = o._key and p._key = a.personId"); + + checkQuery(qry.toString(), orgCache, true, 2); + + checkQuery(qry.toString(), orgCache, false, 2); + } + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java index 0827bac..e4ee2fd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java @@ -108,6 +108,9 @@ public class IgniteCacheDistributedJoinQueryTest extends GridCommonAbstractTest List<Integer> orgIds = putData1(); + checkQuery("select _key, name from \"org\".Organization o " + + "inner join (select orgId from Person) p on p.orgId = o._key", pCache, total); + checkQuery("select o._key, o.name, p._key, p.name " + "from \"org\".Organization o, Person p " + "where p.orgId = o._key", pCache, total); @@ -336,6 +339,46 @@ public class IgniteCacheDistributedJoinQueryTest extends GridCommonAbstractTest } /** + * @throws Exception If failed. + */ + public void testJoinQuery5() throws Exception { + Ignite client = grid(2); + + try { + CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false))); + CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false))); + + IgniteCache<Object, Object> pCache = client.createCache(ccfg1); + IgniteCache<Object, Object> orgCache = client.createCache(ccfg2); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger pKey = new AtomicInteger(); + + Integer orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + orgId)); + + Integer pId = keyForNode(aff, pKey, node1); + + pCache.put(pId, new Person(orgId, "p-" + orgId)); + + checkQuery("select o._key from \"org\".Organization o, Person p where p.orgId = o._key", pCache, 1); + + checkQuery("select o.name from \"org\".Organization o where o._key in " + + "(select o._key from \"org\".Organization o, Person p where p.orgId = o._key)", pCache, 1); + } + finally { + client.destroyCache(PERSON_CACHE); + client.destroyCache(ORG_CACHE); + } + } + + /** * @param sql SQL. * @param cache Cache. * @param expSize Expected results size. http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java new file mode 100644 index 0000000..3c0449b --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends AbstractH2CompareQueryTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ACCOUNT_CACHE = "acc"; + + /** */ + private boolean client; + + /** */ + private boolean h2DataInserted; + + /** {@inheritDoc} */ + @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void initCacheAndDbData() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void checkAllDataEquals() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration personCache() { + CacheConfiguration ccfg = configuration(PERSON_CACHE, 0); + + // Person cache is replicated. + ccfg.setCacheMode(REPLICATED); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + return ccfg; + } + + /** + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration accountCache(int backups) { + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, backups); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + return ccfg; + } + + /** + * @param name Cache name. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name, int backups) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(backups); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = true; + + startGrid(SRVS); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected Statement initializeH2Schema() throws SQLException { + Statement st = super.initializeH2Schema(); + + st.execute("CREATE SCHEMA \"person\""); + st.execute("CREATE SCHEMA \"acc\""); + + st.execute("create table \"person\".PERSON" + + " (_key int not null," + + " _val other not null," + + " name varchar(255))"); + + st.execute("create table \"acc\".ACCOUNT" + + " (_key int not null," + + " _val other not null," + + " personId int," + + " name varchar(255))"); + + return st; + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(SRVS); + + client.createCache(personCache()); + + checkJoin(0); + + h2DataInserted = true; + + checkJoin(1); + + checkJoin(2); + } + + /** + * @param accBackups Account cache backups. + * @throws Exception If failed. + */ + private void checkJoin(int accBackups) throws Exception { + Ignite client = grid(SRVS); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger accKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + try { + IgniteCache<Object, Object> accCache = client.createCache(accountCache(accBackups)); + + Integer pKey1 = keyForNode(aff, pKey, node0); // No accounts. + insert(personCache, pKey1, new Person("p1")); + + Integer pKey2 = keyForNode(aff, pKey, node0); // 1 collocated account. + insert(personCache, pKey2, new Person("p2")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey2, "a-p2")); + + Integer pKey3 = keyForNode(aff, pKey, node0); // 1 non-collocated account. + insert(personCache, pKey3, new Person("p3")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey3, "a-p3")); + + Integer pKey4 = keyForNode(aff, pKey, node0); // 1 collocated, 1 non-collocated account. + insert(personCache, pKey4, new Person("p4")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey4, "a-p4-1")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey4, "a-p4-2")); + + Integer pKey5 = keyForNode(aff, pKey, node0); // 2 collocated accounts. + insert(personCache, pKey5, new Person("p5")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey5, "a-p5-1")); + insert(accCache, keyForNode(aff, accKey, node0), new Account(pKey5, "a-p5-1")); + + Integer pKey6 = keyForNode(aff, pKey, node0); // 2 non-collocated accounts. + insert(personCache, pKey6, new Person("p6")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey6, "a-p5-1")); + insert(accCache, keyForNode(aff, accKey, node1), new Account(pKey6, "a-p5-1")); + + Integer[] keys = {pKey1, pKey2, pKey3, pKey4, pKey5, pKey6}; + + for (int i = 0; i < keys.length; i++) { + log.info("Test key: " + i); + + Integer key = keys[i]; + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p, \"acc\".Account a " + + "where p._key = a.personId and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"acc\".Account a, \"person\".Person p " + + "where p._key = a.personId and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p left outer join \"acc\".Account a " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"person\".Person p right outer join \"acc\".Account a " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"acc\".Account a left outer join \"person\".Person p " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + + checkQuery("select p._key, p.name, a.name " + + "from \"acc\".Account a right outer join \"person\".Person p " + + "on (p._key = a.personId) and p._key=?", accCache, true, key); + } + } + finally { + client.destroyCache(ACCOUNT_CACHE); + + personCache.removeAll(); + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param p Person. + * @throws Exception If failed. + */ + private void insert(IgniteCache<Object, Object> cache, int key, Person p) throws Exception { + cache.put(key, p); + + if (h2DataInserted) + return; + + try(PreparedStatement st = conn.prepareStatement("insert into \"person\".PERSON " + + "(_key, _val, name) values(?, ?, ?)")) { + st.setObject(1, key); + st.setObject(2, p); + st.setObject(3, p.name); + + st.executeUpdate(); + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param a Account. + * @throws Exception If failed. + */ + private void insert(IgniteCache<Object, Object> cache, int key, Account a) throws Exception { + cache.put(key, a); + + if (h2DataInserted) + return; + + try(PreparedStatement st = conn.prepareStatement("insert into \"acc\".ACCOUNT " + + "(_key, _val, personId, name) values(?, ?, ?, ?)")) { + st.setObject(1, key); + st.setObject(2, a); + st.setObject(3, a.personId); + st.setObject(4, a.name); + + st.executeUpdate(); + } + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param args Arguments. + * @throws Exception If failed. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + Object... args) throws Exception { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setArgs(args) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + compareQueryRes0(cache, sql, true, enforceJoinOrder, args, Ordering.RANDOM); + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + int personId; + + /** */ + String name; + + /** + * @param personId Person ID. + * @param name Name. + */ + public Account(int personId, String name) { + this.personId = personId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Person(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java index 39c0cb5..bd773ce 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java @@ -53,6 +53,9 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr private static final String ORG_CACHE = "org"; /** */ + private static final String ORG_CACHE_REPLICATED = "orgRepl"; + + /** */ private boolean client; /** {@inheritDoc} */ @@ -95,6 +98,20 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr ccfgs.add(ccfg); } + { + CacheConfiguration ccfg = configuration(ORG_CACHE_REPLICATED); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("id", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); cfg.setClientMode(client); @@ -144,10 +161,12 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + IgniteCache<Object, Object> orgCacheRepl = client.cache(ORG_CACHE_REPLICATED); List<Integer> keys = primaryKeys(ignite(0).cache(PERSON_CACHE), 3, 200_000); orgCache.put(keys.get(0), new Organization(0, "org1")); + orgCacheRepl.put(keys.get(0), new Organization(0, "org1")); personCache.put(keys.get(1), new Person(0, "p1")); personCache.put(keys.get(2), new Person(0, "p2")); @@ -166,6 +185,28 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr checkQuery("select o.name, p._key, p.name " + "from \"org\".Organization o left join \"person\".Person p " + "on (p.orgId = o.id)", orgCache, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"person\".Person p join \"orgRepl\".Organization o " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"orgRepl\".Organization o join \"person\".Person p " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"person\".Person p left join \"orgRepl\".Organization o " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select o.name, p._key, p.name " + + "from \"orgRepl\".Organization o left join \"person\".Person p " + + "on (p.orgId = o.id)", orgCacheRepl, 2); + + checkQuery("select p.name from \"person\".Person p ", ignite(0).cache(PERSON_CACHE), 2); + checkQuery("select p.name from \"person\".Person p ", ignite(1).cache(PERSON_CACHE), 2); + + for (int i = 0; i < 10; i++) + checkQuery("select p.name from \"person\".Person p ", personCache, 2); } /** @@ -233,6 +274,7 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr int id; /** + * @param id ID. * @param name Name. */ public Organization(int id, String name) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java index d936514..fe0fb12 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java @@ -56,6 +56,9 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest /** */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** */ + protected static final int SRVS = 4; + /** Partitioned cache. */ protected static IgniteCache pCache; @@ -122,7 +125,7 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - Ignite ignite = startGrids(4); + Ignite ignite = startGrids(SRVS); pCache = ignite.cache("part"); @@ -268,10 +271,32 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest * @return Result set after SQL query execution. * @throws SQLException If exception. */ + protected static List<List<?>> compareQueryRes0(IgniteCache cache, + String sql, + boolean distrib, + @Nullable Object[] args, + Ordering ordering) throws SQLException { + return compareQueryRes0(cache, sql, distrib, false, args, ordering); + } + + /** + * Execute given sql query on h2 database and on ignite cache and compare results. + * + * @param cache Ignite cache. + * @param sql SQL query. + * @param distrib Distributed SQL Join flag. + * @param enforceJoinOrder Enforce join order flag. + * @param args SQL arguments. + * @param ordering Expected ordering of SQL results. If {@link Ordering#ORDERED} + * then results will compare as ordered queries. + * @return Result set after SQL query execution. + * @throws SQLException If exception. + */ @SuppressWarnings("unchecked") protected static List<List<?>> compareQueryRes0(IgniteCache cache, String sql, boolean distrib, + boolean enforceJoinOrder, @Nullable Object[] args, Ordering ordering) throws SQLException { if (args == null) @@ -286,7 +311,10 @@ public abstract class AbstractH2CompareQueryTest extends GridCommonAbstractTest // // X.println("Plan : " + plan); - List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).setArgs(args).setDistributedJoins(distrib)).getAll(); + List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql). + setArgs(args). + setDistributedJoins(distrib). + setEnforceJoinOrder(enforceJoinOrder)).getAll(); try { assertRsEquals(h2Res, cacheRes, ordering);
