IGNITE-4436 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce374cd9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce374cd9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce374cd9 Branch: refs/heads/ignite-4436-2 Commit: ce374cd9e3707458aaf39dc81a824b01879cc36f Parents: 58cf839 Author: Alexey Kuznetsov <[email protected]> Authored: Mon Feb 13 17:31:39 2017 +0700 Committer: Alexey Kuznetsov <[email protected]> Committed: Mon Feb 13 17:31:39 2017 +0700 ---------------------------------------------------------------------- .../processors/query/GridQueryIndexing.java | 4 +--- .../processors/query/GridQueryProcessor.java | 3 +-- .../visor/query/VisorCancelQueriesTask.java | 22 +++++++------------- .../query/VisorCollectRunningQueriesTask.java | 8 +++++-- .../internal/visor/query/VisorRunningQuery.java | 21 +++++++++++++++---- .../processors/query/h2/IgniteH2Indexing.java | 17 ++++++++------- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- 7 files changed, 43 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 133bd76..ca04724 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -21,8 +21,6 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.UUID; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.QueryCursor; @@ -255,7 +253,7 @@ public interface GridQueryIndexing { * * @param queries Queries ID's to cancel. */ - public void cancelQueries(Set<Long> queries); + public void cancelQueries(Collection<Long> queries); /** * Cancels all executing queries. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 5d415a2..ee9224b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -44,7 +44,6 @@ import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -937,7 +936,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * * @param queries Queries ID's to cancel. */ - public void cancelQueries(Set<Long> queries) { + public void cancelQueries(Collection<Long> queries) { if (moduleEnabled()) idx.cancelQueries(queries); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java index 7b3c33c..a6f2d82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java @@ -17,28 +17,25 @@ package org.apache.ignite.internal.visor.query; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.visor.VisorJob; -import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.jetbrains.annotations.Nullable; /** * Task to cancel queries. */ @GridInternal -public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Long>>, Void, Void> { +public class VisorCancelQueriesTask extends VisorOneNodeTask<Collection<Long>, Void> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override protected VisorCancelQueriesJob job(Map<UUID, Set<Long>> arg) { + @Override protected VisorCancelQueriesJob job(Collection<Long> arg) { return new VisorCancelQueriesJob(arg, debug); } @@ -50,7 +47,7 @@ public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Lon /** * Job to cancel queries on node. */ - private static class VisorCancelQueriesJob extends VisorJob<Map<UUID, Set<Long>>, Void> { + private static class VisorCancelQueriesJob extends VisorJob<Collection<Long>, Void> { /** */ private static final long serialVersionUID = 0L; @@ -60,16 +57,13 @@ public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Lon * @param arg Job argument. * @param debug Flag indicating whether debug information should be printed into node log. */ - protected VisorCancelQueriesJob(@Nullable Map<UUID, Set<Long>> arg, boolean debug) { + protected VisorCancelQueriesJob(@Nullable Collection<Long> arg, boolean debug) { super(arg, debug); } /** {@inheritDoc} */ - @Override protected Void run(@Nullable Map<UUID, Set<Long>> arg) throws IgniteException { - Set<Long> queries = arg.get(ignite.localNode().id()); - - if (!F.isEmpty(queries)) - ignite.context().query().cancelQueries(queries); + @Override protected Void run(@Nullable Collection<Long> queries) throws IgniteException { + ignite.context().query().cancelQueries(queries); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java index 8ac8ace..2b40e61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.jetbrains.annotations.Nullable; @@ -49,7 +50,7 @@ public class VisorCollectRunningQueriesTask extends VisorMultiNodeTask<Long, Map Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>(); for (ComputeJobResult res : results) - if (res.getException() != null) { + if (res.getException() == null) { Collection<VisorRunningQuery> queries = res.getData(); map.put(res.getNode().id(), queries); @@ -82,8 +83,11 @@ public class VisorCollectRunningQueriesTask extends VisorMultiNodeTask<Long, Map Collection<VisorRunningQuery> res = new ArrayList<>(queries.size()); + long curTime = U.currentTimeMillis(); + for (GridRunningQueryInfo qry : queries) - res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), qry.startTime(), + res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), + qry.startTime(), curTime - qry.startTime(), qry.cancelable(), qry.local())); return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java index 5605ea2..fc6bc7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java @@ -43,6 +43,9 @@ public class VisorRunningQuery implements Serializable { private long startTime; /** */ + private long duration; + + /** */ private boolean cancellable; /** */ @@ -54,16 +57,19 @@ public class VisorRunningQuery implements Serializable { * @param qryType Query type. * @param cache Cache where query was executed. * @param startTime Query start time. + * @param duration Query current duration. * @param cancellable {@code true} if query can be canceled. * @param loc {@code true} if query is local. */ - public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, long startTime, + public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, + long startTime, long duration, boolean cancellable, boolean loc) { this.id = id; this.qry = qry; this.qryType = qryType; this.cache = cache; this.startTime = startTime; + this.duration = duration; this.cancellable = cancellable; this.loc = loc; } @@ -71,21 +77,21 @@ public class VisorRunningQuery implements Serializable { /** * @return Query ID. */ - public long id() { + public long getId() { return id; } /** * @return Query txt. */ - public String query() { + public String getQuery() { return qry; } /** * @return Query type. */ - public GridCacheQueryType queryType() { + public GridCacheQueryType getQueryType() { return qryType; } @@ -104,6 +110,13 @@ public class VisorRunningQuery implements Serializable { } /** + * @return Query duration. + */ + public long getDuration() { + return duration; + } + + /** * @return {@code true} if query can be cancelled. */ public boolean isCancelable() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index e64c735..e4b0c1f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -48,7 +48,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -2298,15 +2297,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void cancelQueries(Set<Long> queries) { - for (Long qryId : queries) { - GridRunningQueryInfo run = runs.get(qryId); + @Override public void cancelQueries(Collection<Long> queries) { + if (!F.isEmpty(queries)) { + for (Long qryId : queries) { + GridRunningQueryInfo run = runs.get(qryId); - if (run != null) - run.cancel(); - } + if (run != null) + run.cancel(); + } - rdcQryExec.cancelQueries(queries); + rdcQryExec.cancelQueries(queries); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce374cd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3540141..78cadd2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1332,7 +1332,7 @@ public class GridReduceQueryExecutor { * * @param queries Queries IDs to cancel. */ - public void cancelQueries(Set<Long> queries) { + public void cancelQueries(Collection<Long> queries) { for (Long qryId : queries) { QueryRun run = runs.get(qryId);
