Distributed joins with segmented spatial index tests added.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4dc36948 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4dc36948 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4dc36948 Branch: refs/heads/master Commit: 4dc369487ef6f5583348f6a0232952c441623723 Parents: 9bc776f Author: Andrey V. Mashenkov <[email protected]> Authored: Tue Feb 21 21:13:13 2017 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Tue Feb 21 21:13:13 2017 +0300 ---------------------------------------------------------------------- .../query/h2/GridH2IndexingGeoSelfTest.java | 405 ++++++++++++++----- .../h2/GridH2IndexingSegmentedGeoSelfTest.java | 13 +- 2 files changed, 302 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java index 2843076..839514b 100644 --- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java +++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingGeoSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2; import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.io.ParseException; import com.vividsolutions.jts.io.WKTReader; import java.io.Serializable; import java.util.Arrays; @@ -31,14 +32,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.NotNull; /** * Geo-indexing test. @@ -50,6 +55,12 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { /** */ private static final long DUR = 60000L; + /** Number of generated samples. */ + public static final int ENEMYCAMP_SAMPLES_COUNT = 500; + + /** Number of generated samples. */ + public static final int ENEMY_SAMPLES_COUNT = 1000; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; @@ -60,29 +71,41 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { return DUR * 3; } - /** {@inheritDoc} */ - @Override protected Class<?>[] indexedTypes() { - return new Class<?>[]{ - Integer.class, EnemyCamp.class, - Long.class, Geometry.class // Geometry must be indexed here. - }; + /** + * @param name Cache name. + * @param partitioned Partition or replicated cache. + * @param idxTypes Indexed types. + * @return Cache configuration. + */ + protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, + Class<?>... idxTypes) throws Exception { + return new CacheConfiguration<K, V>(name) + .setName(name) + .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setIndexedTypes(idxTypes); } /** * @throws Exception If failed. */ public void testPrimitiveGeometry() throws Exception { - IgniteCache<Long, Geometry> cache = grid(0).cache(null); + IgniteCache<Long, Geometry> cache = grid(0).getOrCreateCache(cacheConfig("geom", true, Long.class, Geometry.class)); - WKTReader r = new WKTReader(); + try { + WKTReader r = new WKTReader(); - for (long i = 0; i < 100; i++) - cache.put(i, r.read("POINT(" + i + " " + i + ")")); + for (long i = 0; i < 100; i++) + cache.put(i, r.read("POINT(" + i + " " + i + ")")); - List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?") - .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll(); + List<List<?>> res = cache.query(new SqlFieldsQuery("explain select _key from Geometry where _val && ?") + .setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))")).setLocal(true)).getAll(); - assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx")); + assertTrue("__ explain: " + res, res.get(0).get(0).toString().contains("_val_idx")); + } + finally { + cache.destroy(); + } } /** @@ -90,59 +113,65 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { */ @SuppressWarnings("unchecked") public void testGeo() throws Exception { - IgniteCache<Integer, EnemyCamp> cache = grid(0).cache(null); + IgniteCache<Integer, EnemyCamp> cache = grid(0).getOrCreateCache( + cacheConfig("camp", true, Integer.class, EnemyCamp.class)); - WKTReader r = new WKTReader(); + try { + WKTReader r = new WKTReader(); - cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A")); - cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B")); - cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C")); - cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D")); + cache.getAndPut(0, new EnemyCamp(r.read("POINT(25 75)"), "A")); + cache.getAndPut(1, new EnemyCamp(r.read("POINT(70 70)"), "B")); + cache.getAndPut(2, new EnemyCamp(r.read("POINT(70 30)"), "C")); + cache.getAndPut(3, new EnemyCamp(r.read("POINT(75 25)"), "D")); - SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?"); + SqlQuery<Integer, EnemyCamp> qry = new SqlQuery(EnemyCamp.class, "coords && ?"); - Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query( - qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); + Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query( + qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); - checkPoints(res, "A"); + checkPoints(res, "A"); - res = cache.query( - qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); + res = cache.query( + qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); - checkPoints(res, "C", "D"); + checkPoints(res, "C", "D"); - // Move B to the first polygon. - cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B")); + // Move B to the first polygon. + cache.getAndPut(1, new EnemyCamp(r.read("POINT(20 75)"), "B")); - res = cache.query( - qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); + res = cache.query( + qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); - checkPoints(res, "A", "B"); + checkPoints(res, "A", "B"); - // Move B to the second polygon. - cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B")); + // Move B to the second polygon. + cache.getAndPut(1, new EnemyCamp(r.read("POINT(30 30)"), "B")); - res = cache.query( - qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); + res = cache.query( + qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); - checkPoints(res, "B", "C", "D"); + checkPoints(res, "B", "C", "D"); - // Remove B. - cache.getAndRemove(1); + // Remove B. + cache.getAndRemove(1); - res = cache.query( - qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); + res = cache.query( + qry.setArgs(r.read("POLYGON((5 70, 5 80, 30 80, 30 70, 5 70))"))).getAll(); - checkPoints(res, "A"); + checkPoints(res, "A"); - res = cache.query( - qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); + res = cache.query( + qry.setArgs(r.read("POLYGON((10 5, 10 35, 70 30, 75 25, 10 5))"))).getAll(); - checkPoints(res, "C", "D"); + checkPoints(res, "C", "D"); - // Check explaint request. - assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " + - "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx")); + // Check explaint request. + assertTrue(F.first(cache.query(new SqlFieldsQuery("explain select * from EnemyCamp " + + "where coords && 'POINT(25 75)'")).getAll()).get(0).toString().contains("coords_idx")); + } + finally { + cache.destroy(); + } } /** @@ -150,100 +179,107 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { */ @SuppressWarnings("unchecked") public void testGeoMultithreaded() throws Exception { - final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).cache(null); - final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache(null); - final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache(null); + final CacheConfiguration<Integer, EnemyCamp> ccfg = cacheConfig("camp", true, Integer.class, EnemyCamp.class); - final String[] points = new String[CNT]; + final IgniteCache<Integer, EnemyCamp> cache1 = grid(0).getOrCreateCache(ccfg); + final IgniteCache<Integer, EnemyCamp> cache2 = grid(1).cache("camp"); + final IgniteCache<Integer, EnemyCamp> cache3 = grid(2).cache("camp"); - WKTReader r = new WKTReader(); + try { + final String[] points = new String[CNT]; - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + WKTReader r = new WKTReader(); - for (int idx = 0; idx < CNT; idx++) { - int x = rnd.nextInt(1, 100); - int y = rnd.nextInt(1, 100); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx))); + for (int idx = 0; idx < CNT; idx++) { + int x = rnd.nextInt(1, 100); + int y = rnd.nextInt(1, 100); - points[idx] = Integer.toString(idx); - } + cache1.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx))); - Thread.sleep(200); + points[idx] = Integer.toString(idx); + } - final AtomicBoolean stop = new AtomicBoolean(); - final AtomicReference<Exception> err = new AtomicReference<>(); + Thread.sleep(200); - IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - WKTReader r = new WKTReader(); + final AtomicBoolean stop = new AtomicBoolean(); + final AtomicReference<Exception> err = new AtomicReference<>(); - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + IgniteInternalFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + WKTReader r = new WKTReader(); - while (!stop.get()) { - int cacheIdx = rnd.nextInt(0, 3); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3; + while (!stop.get()) { + int cacheIdx = rnd.nextInt(0, 3); - int idx = rnd.nextInt(CNT); - int x = rnd.nextInt(1, 100); - int y = rnd.nextInt(1, 100); + IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3; - cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx))); + int idx = rnd.nextInt(CNT); + int x = rnd.nextInt(1, 100); + int y = rnd.nextInt(1, 100); - U.sleep(50); - } + cache.getAndPut(idx, new EnemyCamp(r.read("POINT(" + x + " " + y + ")"), Integer.toString(idx))); - return null; - } - }, Runtime.getRuntime().availableProcessors(), "put-thread"); + U.sleep(50); + } - IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - WKTReader r = new WKTReader(); + return null; + } + }, Runtime.getRuntime().availableProcessors(), "put-thread"); - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + IgniteInternalFuture<?> qryFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + WKTReader r = new WKTReader(); - while (!stop.get()) { - try { - int cacheIdx = rnd.nextInt(0, 3); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3; + while (!stop.get()) { + try { + int cacheIdx = rnd.nextInt(0, 3); - SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>( - EnemyCamp.class, "coords && ?"); + IgniteCache<Integer, EnemyCamp> cache = cacheIdx == 0 ? cache1 : cacheIdx == 1 ? cache2 : cache3; - Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs( - r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll(); + SqlQuery<Integer, EnemyCamp> qry = new SqlQuery<>( + EnemyCamp.class, "coords && ?"); - checkPoints(res, points); + Collection<Cache.Entry<Integer, EnemyCamp>> res = cache.query(qry.setArgs( + r.read("POLYGON((0 0, 0 100, 100 100, 100 0, 0 0))"))).getAll(); - U.sleep(5); - } - catch (Exception e) { - err.set(e); + checkPoints(res, points); - stop.set(true); + U.sleep(5); + } + catch (Exception e) { + err.set(e); - break; + stop.set(true); + + break; + } } - } - return null; - } - }, 4, "qry-thread"); + return null; + } + }, 4, "qry-thread"); - U.sleep(60000L); + U.sleep(6000L); - stop.set(true); + stop.set(true); - putFut.get(); - qryFut.get(); + putFut.get(); + qryFut.get(); - Exception err0 = err.get(); + Exception err0 = err.get(); - if (err0 != null) - throw err0; + if (err0 != null) + throw err0; + } + finally { + cache1.destroy(); + } } /** @@ -252,7 +288,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { * @param res Result. * @param points Expected points. */ - private void checkPoints( Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) { + private void checkPoints(Collection<Cache.Entry<Integer, EnemyCamp>> res, String... points) { Set<String> set = new HashSet<>(Arrays.asList(points)); assertEquals(set.size(), res.size()); @@ -262,12 +298,157 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { } /** + * @throws Exception if fails. + */ + public void testSegmentedGeoIndexJoin() throws Exception { + IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class)); + IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", true, Integer.class, EnemyCamp.class)); + + try { + fillCache(); + + checkDistributedQuery(); + + checkLocalQuery(); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** + * @throws Exception if fails. + */ + public void testSegmentedGeoIndexJoin2() throws Exception { + IgniteCache<Integer, Enemy> c1 = ignite(0).getOrCreateCache(cacheConfig("enemy", true, Integer.class, Enemy.class)); + IgniteCache<Integer, EnemyCamp> c2 = ignite(0).getOrCreateCache(cacheConfig("camp", false, Integer.class, EnemyCamp.class)); + + try { + fillCache(); + + checkDistributedQuery(); + + checkLocalQuery(); + } + finally { + c1.destroy(); + c2.destroy(); + } + } + + /** */ + private void checkDistributedQuery() throws ParseException { + IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy"); + IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp"); + + final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))"); + + int expectedEnemies = 0; + + for (Cache.Entry<Integer, Enemy> e : c1) { + final Integer campID = e.getValue().campId; + + if (30 <= campID && campID < ENEMYCAMP_SAMPLES_COUNT) { + final EnemyCamp camp = c2.get(campID); + + if (lethalArea.covers(camp.coords)) + expectedEnemies++; + } + } + + final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " + + "where e.campId = c._key and c.coords && ?").setArgs(lethalArea); + + List<List<?>> result = c1.query(query.setDistributedJoins(true)).getAll(); + + assertEquals(expectedEnemies, result.size()); + } + + /** */ + private void checkLocalQuery() throws ParseException { + IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy"); + IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp"); + + final Geometry lethalArea = new WKTReader().read("POLYGON((30 30, 30 70, 70 70, 70 30, 30 30))"); + + Set<Integer> localCampsIDs = new HashSet<>(); + + for(Cache.Entry<Integer, EnemyCamp> e : c2.localEntries()) + localCampsIDs.add(e.getKey()); + + int expectedEnemies = 0; + + for (Cache.Entry<Integer, Enemy> e : c1.localEntries()) { + final Integer campID = e.getValue().campId; + + if (localCampsIDs.contains(campID)) { + final EnemyCamp camp = c2.get(campID); + + if (lethalArea.covers(camp.coords)) + expectedEnemies++; + } + } + + final SqlFieldsQuery query = new SqlFieldsQuery("select e._val, c._val from \"enemy\".Enemy e, \"camp\".EnemyCamp c " + + "where e.campId = c._key and c.coords && ?").setArgs(lethalArea); + + List<List<?>> result = c1.query(query.setLocal(true)).getAll(); + + assertEquals(expectedEnemies, result.size()); + } + + /** */ + private void fillCache() throws ParseException { + IgniteCache<Integer, Enemy> c1 = ignite(0).cache("enemy"); + IgniteCache<Integer, EnemyCamp> c2 = ignite(0).cache("camp"); + + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + WKTReader r = new WKTReader(); + + for (int i = 0; i < ENEMYCAMP_SAMPLES_COUNT; i++) { + final String point = String.format("POINT(%d %d)", rnd.nextInt(100), rnd.nextInt(100)); + + c2.put(i, new EnemyCamp(r.read(point), "camp-" + i)); + } + + for (int i = 0; i < ENEMY_SAMPLES_COUNT; i++) { + int campID = 30 + rnd.nextInt(ENEMYCAMP_SAMPLES_COUNT + 10); + + c1.put(i, new Enemy(campID, "enemy-" + i)); + } + } + + /** + * + */ + private static class Enemy { + /** */ + @QuerySqlField + int campId; + + /** */ + @QuerySqlField + String name; + + /** + * @param campId Camp ID. + * @param name Name. + */ + public Enemy(int campId, String name) { + this.campId = campId; + this.name = name; + } + } + + /** * */ - private static class EnemyCamp implements Serializable { + protected static class EnemyCamp implements Serializable { /** */ @QuerySqlField(index = true) - private Geometry coords; + Geometry coords; /** */ @QuerySqlField @@ -277,7 +458,7 @@ public class GridH2IndexingGeoSelfTest extends GridCacheAbstractSelfTest { * @param coords Coordinates. * @param name Name. */ - private EnemyCamp(Geometry coords, String name) { + EnemyCamp(Geometry coords, String name) { this.coords = coords; this.name = name; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4dc36948/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java index e404f38..eb0fd0f 100644 --- a/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java +++ b/modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingSegmentedGeoSelfTest.java @@ -23,10 +23,15 @@ import org.apache.ignite.configuration.CacheConfiguration; * Test for segmented geo index. */ public class GridH2IndexingSegmentedGeoSelfTest extends GridH2IndexingGeoSelfTest { - /** {@inheritDoc} */ - @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { - return super.cacheConfiguration(gridName).setQueryParallelism(7); - } + /** */ + private static int QRY_PARALLELISM_LVL = 7; + /** {@inheritDoc} */ + @Override + protected <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, + Class<?>... idxTypes) throws Exception { + final CacheConfiguration<K, V> ccfg = super.cacheConfig(name, partitioned, idxTypes); + return ccfg.setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1); + } } \ No newline at end of file
