IGNITE-4436 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/effc624d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/effc624d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/effc624d Branch: refs/heads/ignite-4436-2 Commit: effc624da659724886bff6685d53f535750a3ea5 Parents: 2a572c4 Author: Alexey Kuznetsov <[email protected]> Authored: Mon Feb 6 11:07:45 2017 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Feb 6 14:56:23 2017 +0700 ---------------------------------------------------------------------- .../processors/query/GridRunningQueryInfo.java | 36 ++++- .../query/VisorCollectCurrentQueriesTask.java | 17 +-- .../ignite/internal/visor/query/VisorQuery.java | 69 ---------- .../internal/visor/query/VisorRunningQuery.java | 119 ++++++++++++++++ .../processors/query/h2/IgniteH2Indexing.java | 66 ++++----- .../h2/twostep/GridReduceQueryExecutor.java | 9 +- .../cache/CacheSqlQueryValueCopySelfTest.java | 137 ++++++++++++++++++- .../cache/GridCacheCrossCacheQuerySelfTest.java | 103 -------------- 8 files changed, 338 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java index ea37d15..d77c8c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.query; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; + /** * Query descriptor. */ @@ -27,6 +29,9 @@ public class GridRunningQueryInfo { /** */ private final String qry; + /** Query type. */ + private final GridCacheQueryType qryType; + /** */ private final String cache; @@ -36,19 +41,27 @@ public class GridRunningQueryInfo { /** */ private final GridQueryCancel cancel; + /** */ + private final boolean loc; + /** * @param id Query ID. * @param qry Query text. + * @param qryType Query type. * @param cache Cache where query was executed. * @param startTime Query start time. * @param cancel Query cancel. + * @param loc Local query flag. */ - public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) { + public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime, + GridQueryCancel cancel, boolean loc) { this.id = id; this.qry = qry; + this.qryType = qryType; this.cache = cache; this.startTime = startTime; this.cancel = cancel; + this.loc = loc; } /** @@ -66,6 +79,13 @@ public class GridRunningQueryInfo { } /** + * @return Query type. + */ + public GridCacheQueryType queryType() { + return qryType; + } + + /** * @return Cache where query was executed. */ public String cache() { @@ -95,4 +115,18 @@ public class GridRunningQueryInfo { if (cancel != null) cancel.cancel(); } + + /** + * @return {@code true} if query can be cancelled. + */ + public boolean cancelable() { + return cancel != null; + } + + /** + * @return {@code true} if query is local. + */ + public boolean local() { + return loc; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java index 0dc0ec5..621b2bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java @@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable; * Task to collect currently running queries. */ @GridInternal -public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorQuery>>, Collection<VisorQuery>> { +public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorRunningQuery>>, Collection<VisorRunningQuery>> { /** */ private static final long serialVersionUID = 0L; @@ -45,12 +45,12 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map } /** {@inheritDoc} */ - @Nullable @Override protected Map<UUID, Collection<VisorQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException { - Map<UUID, Collection<VisorQuery>> map = new HashMap<>(); + @Nullable @Override protected Map<UUID, Collection<VisorRunningQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException { + Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>(); for (ComputeJobResult res : results) if (res.getException() != null) { - Collection<VisorQuery> queries = res.getData(); + Collection<VisorRunningQuery> queries = res.getData(); map.put(res.getNode().id(), queries); } @@ -61,7 +61,7 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map /** * Job to collect currently running queries from node. */ - private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorQuery>> { + private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorRunningQuery>> { /** * Create job with specified argument. * @@ -73,13 +73,14 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map } /** {@inheritDoc} */ - @Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException { + @Override protected Collection<VisorRunningQuery> run(@Nullable Long duration) throws IgniteException { Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration); - Collection<VisorQuery> res = new ArrayList<>(queries.size()); + Collection<VisorRunningQuery> res = new ArrayList<>(queries.size()); for (GridRunningQueryInfo qry : queries) - res.add(new VisorQuery(qry.id(), qry.query(), qry.cache())); + res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), qry.startTime(), + qry.cancelable(), qry.local())); return res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java deleted file mode 100644 index e9beff9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.query; - -import java.io.Serializable; - -/** - * Arguments for {@link VisorQueryTask}. - */ -public class VisorQuery implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private Long id; - - /** Query text. */ - private String qry; - - /** Cache name for query. */ - private String cache; - - /** - * @param id Query ID. - * @param qry Query text. - * @param cache Cache where query was executed. - */ - public VisorQuery(Long id, String qry, String cache) { - this.id = id; - this.qry = qry; - this.cache = cache; - } - - /** - * @return Query ID. - */ - public Long id() { - return id; - } - - /** - * @return Query txt. - */ - public String query() { - return qry; - } - - /** - * @return Cache name. - */ - public String getCache() { - return cache; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java new file mode 100644 index 0000000..5605ea2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.query; + +import java.io.Serializable; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; + +/** + * Descriptor of running query. + */ +public class VisorRunningQuery implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long id; + + /** Query text. */ + private String qry; + + /** Query type. */ + private GridCacheQueryType qryType; + + /** Cache name for query. */ + private String cache; + + /** */ + private long startTime; + + /** */ + private boolean cancellable; + + /** */ + private boolean loc; + + /** + * @param id Query ID. + * @param qry Query text. + * @param qryType Query type. + * @param cache Cache where query was executed. + * @param startTime Query start time. + * @param cancellable {@code true} if query can be canceled. + * @param loc {@code true} if query is local. + */ + public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, long startTime, + boolean cancellable, boolean loc) { + this.id = id; + this.qry = qry; + this.qryType = qryType; + this.cache = cache; + this.startTime = startTime; + this.cancellable = cancellable; + this.loc = loc; + } + + /** + * @return Query ID. + */ + public long id() { + return id; + } + + /** + * @return Query txt. + */ + public String query() { + return qry; + } + + /** + * @return Query type. + */ + public GridCacheQueryType queryType() { + return qryType; + } + + /** + * @return Cache name. + */ + public String getCache() { + return cache; + } + + /** + * @return Query start time. + */ + public long getStartTime() { + return startTime; + } + + /** + * @return {@code true} if query can be cancelled. + */ + public boolean isCancelable() { + return cancellable; + } + + /** + * @return {@code true} if query is local. + */ + public boolean isLocal() { + return loc; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index c0f5f09..5be4f03 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -182,6 +182,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.IgniteSystemProperties.getString; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED; @@ -782,8 +785,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { IndexingQueryFilter filters) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); - if (tbl != null && tbl.luceneIdx != null) - return tbl.luceneIdx.query(qry, filters); + if (tbl != null && tbl.luceneIdx != null) { + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName, + U.currentTimeMillis(), null, true); + + try { + runs.put(run.id(), run); + + return tbl.luceneIdx.query(qry, filters); + } + finally { + runs.remove(run.id()); + } + } return new GridEmptyCloseableIterator<>(); } @@ -841,7 +855,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, + spaceName, U.currentTimeMillis(), cancel, true); runs.putIfAbsent(run.id(), run); @@ -1103,7 +1118,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false)); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName, + U.currentTimeMillis(), null, true); runs.put(run.id(), run); @@ -2269,11 +2285,23 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) { - return rdcQryExec.longRunningQueries(duration); + Collection<GridRunningQueryInfo> res = new ArrayList<>(); + + res.addAll(runs.values()); + res.addAll(rdcQryExec.longRunningQueries(duration)); + + return res; } /** {@inheritDoc} */ @Override public void cancelQueries(Set<Long> queries) { + for (Long qryId : queries) { + GridRunningQueryInfo run = runs.get(qryId); + + if (run != null) + run.cancel(); + } + rdcQryExec.cancelQueries(queries); } @@ -3191,32 +3219,4 @@ public class IgniteH2Indexing implements GridQueryIndexing { lastUsage = U.currentTimeMillis(); } } - - /** - * Query run. - */ - private static class QueryRun { - /** */ - private final GridRunningQueryInfo qry; - - /** */ - private final long startTime; - - /** */ - private final GridQueryCancel cancel; - - /** - * - * @param id - * @param qry - * @param cache - * @param startTime - * @param cancel - */ - public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) { - this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel); - this.startTime = startTime; - this.cancel = cancel; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index febe810..3540141 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -99,6 +99,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; /** @@ -1332,8 +1333,10 @@ public class GridReduceQueryExecutor { * @param queries Queries IDs to cancel. */ public void cancelQueries(Set<Long> queries) { - for (QueryRun run : runs.values()) { - if (queries.contains(run.qry.id())) + for (Long qryId : queries) { + QueryRun run = runs.get(qryId); + + if (run != null) run.qry.cancel(); } } @@ -1371,7 +1374,7 @@ public class GridReduceQueryExecutor { * @param cancel Query cancel handler. */ private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) { - this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel); + this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false); this.conn = (JdbcConnection)conn; this.idxs = new ArrayList<>(idxsCnt); this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java index e47e893..a91f65e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java @@ -17,15 +17,23 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; +import java.util.Collections; import java.util.List; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { cc.setCopyOnRead(true); cc.setIndexedTypes(Integer.class, Value.class); + cc.setSqlFunctionClasses(TestSQLFunctions.class); cfg.setCacheConfiguration(cc); @@ -195,6 +204,108 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { check(cache); } + /** + * Run specified query in separate thread. + * + * @param qry Query to execute. + */ + private IgniteInternalFuture<?> runQueryAsync(final Query<?> qry) throws Exception { + return multithreadedAsync(new Runnable() { + @Override public void run() { + try { + grid(0).cache(null).query(qry).getAll(); + } + catch (Throwable e) { + e.printStackTrace(); + } + } + }, 1, "run-query"); + } + + /** + * Test collecting info about running. + * + * @throws Exception If failed. + */ + public void testRunningQueries() throws Exception { + IgniteInternalFuture<?> fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3")); + + Thread.sleep(500); + + GridQueryProcessor qryProc = ((IgniteKernal)grid(0)).context().query(); + + Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + + SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"); + qry.setLocal(true); + + fut = runQueryAsync(qry); + + Thread.sleep(500); + + queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + } + + /** + * Test collecting info about running. + * + * @throws Exception If failed. + */ + public void testCancelingQueries() throws Exception { + final Ignite ignite = grid(0); + + runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)")); + + Thread.sleep(500); + + final GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query(); + + Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + final Collection<GridRunningQueryInfo> finalQueries = queries; + + for (GridRunningQueryInfo query : finalQueries) + qryProc.cancelQueries(Collections.singleton(query.id())); + + int n = 100; + + // Give cluster some time to cancel query and cleanup resources. + while (n > 0) { + Thread.sleep(100); + + queries = qryProc.runningQueries(0); + + if (queries.isEmpty()) + break; + + log.info(">>>> Wait for cancel: " + n); + + n--; + } + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + } + /** */ private static class Value { /** */ @@ -223,4 +334,28 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { assertEquals(KEYS, cnt); } -} \ No newline at end of file + + /** + * Utility class with custom SQL functions. + */ + public static class TestSQLFunctions { + /** + * Sleep function to simulate long running queries. + * + * @param x Time to sleep. + * @return Return specified argument. + */ + @QuerySqlFunction + public static long sleep(long x) { + if (x >= 0) + try { + Thread.sleep(x); + } + catch (InterruptedException ignored) { + // No-op. + } + + return x; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index d6a766d..337ae29 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -58,30 +58,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { /** */ private Ignite ignite; - /** - * Utility class with custom SQL functions. - */ - public static class TestSQLFunctions { - /** - * Sleep function to simulate long running queries. - * - * @param x Time to sleep. - * @return Return specified argument. - */ - @QuerySqlFunction - public static long sleep(long x) { - if (x >= 0) - try { - Thread.sleep(x); - } - catch (InterruptedException ignored) { - // No-op. - } - - return x; - } - } - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration c = super.getConfiguration(gridName); @@ -141,8 +117,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { else throw new IllegalStateException("mode: " + mode); - cc.setSqlFunctionClasses(TestSQLFunctions.class); - return cc; } @@ -248,83 +222,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { } /** - * Test collecting info about running. - * - * @throws Exception If failed. - */ - public void testRunningQueries() throws Exception { - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - try { - SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(3000) from FactPurchase limit 1"); - - ignite.cache("partitioned").query(qry).getAll(); - } - catch (Throwable e) { - e.printStackTrace(); - } - } - }, 1); - - Thread.sleep(1000); - - GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query(); - - Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500); - - assertEquals(1, queries.size()); - - fut.get(); - - queries = qryProc.runningQueries(500); - - assertEquals(0, queries.size()); - } - - /** - * Test collecting info about running. - * - * @throws Exception If failed. - */ - public void testCancelingQueries() throws Exception { - IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - try { - SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(500) from FactPurchase limit 100"); - - ignite.cache("partitioned").query(qry).getAll(); - } - catch (Throwable e) { - e.printStackTrace(); - } - } - }, 1); - - Thread.sleep(1000); - - GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query(); - - Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500); - - assertEquals(1, queries.size()); - - for (GridRunningQueryInfo query : queries) - queryProc.cancelQueries(Collections.singleton(query.id())); - - Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources. - - queries = queryProc.runningQueries(500); - - assertEquals(0, queries.size()); - - fut.get(); - - queries = queryProc.runningQueries(500); - - assertEquals(0, queries.size()); - } - - /** * @throws Exception If failed. */ public void testApiQueries() throws Exception {
