Repository: ignite
Updated Branches:
  refs/heads/ignite-1232 45ce0fd7d -> b7826b28a


ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7826b28
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7826b28
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7826b28

Branch: refs/heads/ignite-1232
Commit: b7826b28af1a9cae05a8a46d82df4b74ef251b3b
Parents: 45ce0fd
Author: sboikov <[email protected]>
Authored: Thu Jul 7 14:25:23 2016 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jul 7 16:56:55 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |   7 +
 .../processors/query/GridQueryProcessor.java    |   1 +
 .../processors/query/h2/IgniteH2Indexing.java   |  10 +-
 .../query/h2/opt/GridH2QueryContext.java        |  24 +-
 .../processors/query/h2/opt/GridH2Table.java    |   4 +-
 .../query/h2/opt/GridH2TreeIndex.java           |  26 +-
 ...ributedJoinPartitionedAndReplicatedTest.java |  36 +-
 .../IgniteCacheDistributedJoinQueryTest.java    |  43 ++
 ...PartitionedAndReplicatedCollocationTest.java | 397 +++++++++++++++++++
 ...teCacheJoinPartitionedAndReplicatedTest.java |  42 ++
 .../h2/sql/AbstractH2CompareQueryTest.java      |  32 +-
 11 files changed, 593 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 36d9104..bd38355 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -567,6 +567,13 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * @return {@code True} if cache is partitioned cache.
+     */
+    public boolean isPartitioned() {
+        return cacheCfg.getCacheMode() == CacheMode.PARTITIONED;
+    }
+
+    /**
      * @return {@code True} in case replication is enabled.
      */
     public boolean isDrEnabled() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a58741a..05fd052 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -768,6 +768,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /**
      * @param cctx Cache context.
      * @param qry Query.
+     * @param keepBinary Keep binary flag.
      * @return Cursor.
      */
     public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index edc35cb..ab8137a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1072,14 +1072,6 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * @param cctx Cache context.
-     * @return {@code true} If the given cache is partitioned.
-     */
-    public static boolean isPartitioned(GridCacheContext<?,?> cctx) {
-        return !cctx.isReplicated() && !cctx.isLocal();
-    }
-
-    /**
      * @param c Connection.
      * @return Session.
      */
@@ -1095,7 +1087,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         Connection c = connectionForSpace(space);
 
         final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
-        final boolean distributedJoins = qry.isDistributedJoins() && 
isPartitioned(cctx);
+        final boolean distributedJoins = qry.isDistributedJoins() && 
cctx.isPartitioned();
         final boolean grpByCollocated = qry.isCollocated();
 
         GridCacheTwoStepQuery twoStepQry;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 223dad6..ef532dd 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -85,7 +85,7 @@ public class GridH2QueryContext {
     private int pageSize;
 
     /** */
-    private GridH2CollocationModel qryCollocationModel;
+    private GridH2CollocationModel qryCollocationMdl;
 
     /**
      * @param locNodeId Local node ID.
@@ -122,14 +122,14 @@ public class GridH2QueryContext {
      * @return Query collocation model.
      */
     public GridH2CollocationModel queryCollocationModel() {
-        return qryCollocationModel;
+        return qryCollocationMdl;
     }
 
     /**
-     * @param qryCollocationModel Query collocation model.
+     * @param qryCollocationMdl Query collocation model.
      */
-    public void queryCollocationModel(GridH2CollocationModel 
qryCollocationModel) {
-        this.qryCollocationModel = qryCollocationModel;
+    public void queryCollocationModel(GridH2CollocationModel 
qryCollocationMdl) {
+        this.qryCollocationMdl = qryCollocationMdl;
     }
 
     /**
@@ -557,13 +557,13 @@ public class GridH2QueryContext {
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            int result = locNodeId.hashCode();
+            int res = locNodeId.hashCode();
 
-            result = 31 * result + nodeId.hashCode();
-            result = 31 * result + (int)(qryId ^ (qryId >>> 32));
-            result = 31 * result + type.hashCode();
+            res = 31 * res + nodeId.hashCode();
+            res = 31 * res + (int)(qryId ^ (qryId >>> 32));
+            res = 31 * res + type.hashCode();
 
-            return result;
+            return res;
         }
 
         /** {@inheritDoc} */
@@ -593,9 +593,9 @@ public class GridH2QueryContext {
 
         /** {@inheritDoc} */
         @Override public boolean equals(Object o) {
-            SourceKey sourceKey = (SourceKey)o;
+            SourceKey srcKey = (SourceKey)o;
 
-            return batchLookupId == sourceKey.batchLookupId && 
ownerId.equals(sourceKey.ownerId);
+            return batchLookupId == srcKey.batchLookupId && 
ownerId.equals(srcKey.ownerId);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 8fd2880..012fb0b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -156,7 +156,7 @@ public class GridH2Table extends TableBase {
      * @return {@code true} If this is a partitioned table.
      */
     public boolean isPartitioned() {
-        return desc != null && IgniteH2Indexing.isPartitioned(desc.context());
+        return desc != null && desc.context().isPartitioned();
     }
 
     /**
@@ -909,7 +909,7 @@ public class GridH2Table extends TableBase {
 
         /** {@inheritDoc} */
         @Override public Cursor find(TableFilter filter, SearchRow first, 
SearchRow last) {
-            return find(filter.getSession(), first, last);
+            return delegate.find(filter, first, last);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 0b85284..7b7e0fa 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -478,6 +478,11 @@ public class GridH2TreeIndex extends GridH2IndexBase 
implements Comparator<GridS
     }
 
     /** {@inheritDoc} */
+    @Override public Cursor find(TableFilter filter, SearchRow first, 
SearchRow last) {
+        return new GridH2Cursor(doFind(first, true, last));
+    }
+
+    /** {@inheritDoc} */
     @Override public Cursor find(Session ses, @Nullable SearchRow first, 
@Nullable SearchRow last) {
         return new GridH2Cursor(doFind(first, true, last));
     }
@@ -507,11 +512,28 @@ public class GridH2TreeIndex extends GridH2IndexBase 
implements Comparator<GridS
      * @param last Upper bound always inclusive.
      * @return Iterator over rows in given range.
      */
+    private Iterator<GridH2Row> doFind(@Nullable SearchRow first,
+        boolean includeFirst,
+        @Nullable SearchRow last) {
+        return doFind(first, includeFirst, last, true);
+    }
+
+    /**
+     * Returns sub-tree bounded by given values.
+     *
+     * @param first Lower bound.
+     * @param includeFirst Whether lower bound should be inclusive.
+     * @param last Upper bound always inclusive.
+     * @return Iterator over rows in given range.
+     */
     @SuppressWarnings("unchecked")
-    private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean 
includeFirst, @Nullable SearchRow last) {
+    private Iterator<GridH2Row> doFind(@Nullable SearchRow first,
+        boolean includeFirst,
+        @Nullable SearchRow last,
+        boolean filter) {
         ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = 
treeForRead();
 
-        return doFind0(t, first, includeFirst, last, threadLocalFilter());
+        return doFind0(t, first, includeFirst, last, filter ? 
threadLocalFilter() : null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
index 56a2f11..4c71a2e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
@@ -163,12 +163,12 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
     public void testJoin() throws Exception {
         Ignite client = grid(2);
 
-        Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
         IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
         IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
         IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
 
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
         AtomicInteger pKey = new AtomicInteger(100_000);
         AtomicInteger orgKey = new AtomicInteger();
         AtomicInteger accKey = new AtomicInteger();
@@ -190,6 +190,10 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
         accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1, 
"a0"));
         accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1, 
"a1"));
 
+//        checkQuery("select p._key, p.name, a.name " +
+//            "from \"person\".Person p, \"acc\".Account a " +
+//            "where p._key = a.personId", orgCache, true, 2);
+
         checkQuery("select o.name, p._key, p.name, a.name " +
             "from \"org\".Organization o, \"person\".Person p, \"acc\".Account 
a " +
             "where p.orgId = o._key and p._key = a.personId", orgCache, true, 
2);
@@ -201,6 +205,34 @@ public class 
IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
         checkQuery("select o.name, p._key, p.name, a.name " +
             "from \"person\".Person p, \"org\".Organization o, \"acc\".Account 
a " +
             "where p.orgId = o._key and p._key = a.personId", orgCache, true, 
2);
+
+        String[] cacheNames = {"\"org\".Organization o", "\"person\".Person 
p", "\"acc\".Account a"};
+
+        for (int c1 = 0; c1 < cacheNames.length; c1++) {
+            for (int c2 = 0; c2 < cacheNames.length; c2++) {
+                if (c2 == c1)
+                    continue;
+
+                for (int c3 = 0; c3 < cacheNames.length; c3++) {
+                    if (c3 == c1 || c3 == c2)
+                        continue;
+
+                    String cache1 = cacheNames[c1];
+                    String cache2 = cacheNames[c2];
+                    String cache3 = cacheNames[c3];
+
+                    StringBuilder qry = new StringBuilder("select o.name, 
p._key, p.name, a.name from ").
+                        append(cache1).append(", ").
+                        append(cache2).append(", ").
+                        append(cache3).append(" ").
+                        append("where p.orgId = o._key and p._key = 
a.personId");
+
+                    checkQuery(qry.toString(), orgCache, true, 2);
+
+                    checkQuery(qry.toString(), orgCache, false, 2);
+                }
+            }
+        }
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
index 0827bac..e4ee2fd 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
@@ -108,6 +108,9 @@ public class IgniteCacheDistributedJoinQueryTest extends 
GridCommonAbstractTest
 
             List<Integer> orgIds = putData1();
 
+            checkQuery("select _key, name from \"org\".Organization o " +
+                "inner join (select orgId from Person) p on p.orgId = o._key", 
pCache, total);
+
             checkQuery("select o._key, o.name, p._key, p.name " +
                 "from \"org\".Organization o, Person p " +
                 "where p.orgId = o._key", pCache, total);
@@ -336,6 +339,46 @@ public class IgniteCacheDistributedJoinQueryTest extends 
GridCommonAbstractTest
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery5() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = 
cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, 
false)));
+            CacheConfiguration ccfg2 = 
cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            Integer orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("org-" + orgId));
+
+            Integer pId = keyForNode(aff, pKey, node1);
+
+            pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+            checkQuery("select o._key from \"org\".Organization o, Person p 
where p.orgId = o._key", pCache, 1);
+
+            checkQuery("select o.name from \"org\".Organization o where o._key 
in " +
+                "(select o._key from \"org\".Organization o, Person p where 
p.orgId = o._key)", pCache, 1);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
      * @param sql SQL.
      * @param cache Cache.
      * @param expSize Expected results size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
new file mode 100644
index 0000000..3c0449b
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import 
org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends 
AbstractH2CompareQueryTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ACCOUNT_CACHE = "acc";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean h2DataInserted;
+
+    /** {@inheritDoc} */
+    @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, 
CacheMode mode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initCacheAndDbData() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkAllDataEquals() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration personCache() {
+        CacheConfiguration ccfg = configuration(PERSON_CACHE, 0);
+
+        // Person cache is replicated.
+        ccfg.setCacheMode(REPLICATED);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Person.class.getName());
+        entity.addQueryField("name", String.class.getName(), null);
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration accountCache(int backups) {
+        CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, backups);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Account.class.getName());
+        entity.addQueryField("personId", Integer.class.getName(), null);
+        entity.addQueryField("name", String.class.getName(), null);
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid(SRVS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Statement initializeH2Schema() throws SQLException {
+        Statement st = super.initializeH2Schema();
+
+        st.execute("CREATE SCHEMA \"person\"");
+        st.execute("CREATE SCHEMA \"acc\"");
+
+        st.execute("create table \"person\".PERSON" +
+            "  (_key int not null," +
+            "  _val other not null," +
+            "  name varchar(255))");
+
+        st.execute("create table \"acc\".ACCOUNT" +
+            "  (_key int not null," +
+            "  _val other not null," +
+            "  personId int," +
+            "  name varchar(255))");
+
+        return st;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(SRVS);
+
+        client.createCache(personCache());
+
+        checkJoin(0);
+
+        h2DataInserted = true;
+
+        checkJoin(1);
+
+        checkJoin(2);
+    }
+
+    /**
+     * @param accBackups Account cache backups.
+     * @throws Exception If failed.
+     */
+    private void checkJoin(int accBackups) throws Exception {
+        Ignite client = grid(SRVS);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger(100_000);
+        AtomicInteger accKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        try {
+            IgniteCache<Object, Object> accCache = 
client.createCache(accountCache(accBackups));
+
+            Integer pKey1 = keyForNode(aff, pKey, node0); // No accounts.
+            insert(personCache, pKey1, new Person("p1"));
+
+            Integer pKey2 = keyForNode(aff, pKey, node0); // 1 collocated 
account.
+            insert(personCache, pKey2, new Person("p2"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey2, "a-p2"));
+
+            Integer pKey3 = keyForNode(aff, pKey, node0); // 1 non-collocated 
account.
+            insert(personCache, pKey3, new Person("p3"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey3, "a-p3"));
+
+            Integer pKey4 = keyForNode(aff, pKey, node0); // 1 collocated, 1 
non-collocated account.
+            insert(personCache, pKey4, new Person("p4"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey4, "a-p4-1"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey4, "a-p4-2"));
+
+            Integer pKey5 = keyForNode(aff, pKey, node0); // 2 collocated 
accounts.
+            insert(personCache, pKey5, new Person("p5"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey5, "a-p5-1"));
+            insert(accCache, keyForNode(aff, accKey, node0), new 
Account(pKey5, "a-p5-1"));
+
+            Integer pKey6 = keyForNode(aff, pKey, node0); // 2 non-collocated 
accounts.
+            insert(personCache, pKey6, new Person("p6"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey6, "a-p5-1"));
+            insert(accCache, keyForNode(aff, accKey, node1), new 
Account(pKey6, "a-p5-1"));
+
+            Integer[] keys = {pKey1, pKey2, pKey3, pKey4, pKey5, pKey6};
+
+            for (int i = 0; i < keys.length; i++) {
+                log.info("Test key: " + i);
+
+                Integer key = keys[i];
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"person\".Person p, \"acc\".Account a " +
+                    "where p._key = a.personId and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"acc\".Account a, \"person\".Person p " +
+                    "where p._key = a.personId and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"person\".Person p left outer join \"acc\".Account 
a " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"person\".Person p right outer join \"acc\".Account 
a " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"acc\".Account a left outer join \"person\".Person 
p " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+
+                checkQuery("select p._key, p.name, a.name " +
+                    "from \"acc\".Account a right outer join \"person\".Person 
p " +
+                    "on (p._key = a.personId) and p._key=?", accCache, true, 
key);
+            }
+        }
+        finally {
+            client.destroyCache(ACCOUNT_CACHE);
+
+            personCache.removeAll();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param p Person.
+     * @throws Exception If failed.
+     */
+    private void insert(IgniteCache<Object, Object> cache, int key, Person p) 
throws Exception {
+        cache.put(key, p);
+
+        if (h2DataInserted)
+            return;
+
+        try(PreparedStatement st = conn.prepareStatement("insert into 
\"person\".PERSON " +
+            "(_key, _val, name) values(?, ?, ?)")) {
+            st.setObject(1, key);
+            st.setObject(2, p);
+            st.setObject(3, p.name);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param  a Account.
+     * @throws Exception If failed.
+     */
+    private void insert(IgniteCache<Object, Object> cache, int key, Account a) 
throws Exception {
+        cache.put(key, a);
+
+        if (h2DataInserted)
+            return;
+
+        try(PreparedStatement st = conn.prepareStatement("insert into 
\"acc\".ACCOUNT " +
+            "(_key, _val, personId, name) values(?, ?, ?, ?)")) {
+            st.setObject(1, key);
+            st.setObject(2, a);
+            st.setObject(3, a.personId);
+            st.setObject(4, a.name);
+
+            st.executeUpdate();
+        }
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        Object... args) throws Exception {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setArgs(args)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        compareQueryRes0(cache, sql, true, enforceJoinOrder, args, 
Ordering.RANDOM);
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        int personId;
+
+        /** */
+        String name;
+
+        /**
+         * @param personId Person ID.
+         * @param name Name.
+         */
+        public Account(int personId, String name) {
+            this.personId = personId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Person(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index 39c0cb5..bd773ce 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -53,6 +53,9 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
     private static final String ORG_CACHE = "org";
 
     /** */
+    private static final String ORG_CACHE_REPLICATED = "orgRepl";
+
+    /** */
     private boolean client;
 
     /** {@inheritDoc} */
@@ -95,6 +98,20 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
             ccfgs.add(ccfg);
         }
 
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE_REPLICATED);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("id", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
         cfg.setCacheConfiguration(ccfgs.toArray(new 
CacheConfiguration[ccfgs.size()]));
 
         cfg.setClientMode(client);
@@ -144,10 +161,12 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
 
         IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
         IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+        IgniteCache<Object, Object> orgCacheRepl = 
client.cache(ORG_CACHE_REPLICATED);
 
         List<Integer> keys = primaryKeys(ignite(0).cache(PERSON_CACHE), 3, 
200_000);
 
         orgCache.put(keys.get(0), new Organization(0, "org1"));
+        orgCacheRepl.put(keys.get(0), new Organization(0, "org1"));
         personCache.put(keys.get(1), new Person(0, "p1"));
         personCache.put(keys.get(2), new Person(0, "p2"));
 
@@ -166,6 +185,28 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
         checkQuery("select o.name, p._key, p.name " +
             "from \"org\".Organization o left join \"person\".Person p " +
             "on (p.orgId = o.id)", orgCache, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"person\".Person p join \"orgRepl\".Organization o " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"orgRepl\".Organization o join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"person\".Person p left join \"orgRepl\".Organization o " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"orgRepl\".Organization o left join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCacheRepl, 2);
+
+        checkQuery("select p.name from \"person\".Person p ", 
ignite(0).cache(PERSON_CACHE), 2);
+        checkQuery("select p.name from \"person\".Person p ", 
ignite(1).cache(PERSON_CACHE), 2);
+
+        for (int i = 0; i < 10; i++)
+            checkQuery("select p.name from \"person\".Person p ", personCache, 
2);
     }
 
     /**
@@ -233,6 +274,7 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest 
extends GridCommonAbstr
         int id;
 
         /**
+         * @param id ID.
          * @param name Name.
          */
         public Organization(int id, String name) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7826b28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
index d936514..fe0fb12 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/AbstractH2CompareQueryTest.java
@@ -56,6 +56,9 @@ public abstract class AbstractH2CompareQueryTest extends 
GridCommonAbstractTest
     /** */
     private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
 
+    /** */
+    protected static final int SRVS = 4;
+
     /** Partitioned cache. */
     protected static IgniteCache pCache;
 
@@ -122,7 +125,7 @@ public abstract class AbstractH2CompareQueryTest extends 
GridCommonAbstractTest
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        Ignite ignite = startGrids(4);
+        Ignite ignite = startGrids(SRVS);
 
         pCache = ignite.cache("part");
 
@@ -268,10 +271,32 @@ public abstract class AbstractH2CompareQueryTest extends 
GridCommonAbstractTest
      * @return Result set after SQL query execution.
      * @throws SQLException If exception.
      */
+    protected static List<List<?>> compareQueryRes0(IgniteCache cache,
+        String sql,
+        boolean distrib,
+        @Nullable Object[] args,
+        Ordering ordering) throws SQLException {
+        return compareQueryRes0(cache, sql, distrib, false, args, ordering);
+    }
+
+    /**
+     * Execute given sql query on h2 database and on ignite cache and compare 
results.
+     *
+     * @param cache Ignite cache.
+     * @param sql SQL query.
+     * @param distrib Distributed SQL Join flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param args SQL arguments.
+     * @param ordering Expected ordering of SQL results. If {@link 
Ordering#ORDERED}
+     *      then results will compare as ordered queries.
+     * @return Result set after SQL query execution.
+     * @throws SQLException If exception.
+     */
     @SuppressWarnings("unchecked")
     protected static List<List<?>> compareQueryRes0(IgniteCache cache,
         String sql,
         boolean distrib,
+        boolean enforceJoinOrder,
         @Nullable Object[] args,
         Ordering ordering) throws SQLException {
         if (args == null)
@@ -286,7 +311,10 @@ public abstract class AbstractH2CompareQueryTest extends 
GridCommonAbstractTest
 //
 //        X.println("Plan : " + plan);
 
-        List<List<?>> cacheRes = cache.query(new 
SqlFieldsQuery(sql).setArgs(args).setDistributedJoins(distrib)).getAll();
+        List<List<?>> cacheRes = cache.query(new SqlFieldsQuery(sql).
+            setArgs(args).
+            setDistributedJoins(distrib).
+            setEnforceJoinOrder(enforceJoinOrder)).getAll();
 
         try {
             assertRsEquals(h2Res, cacheRes, ordering);

Reply via email to