Repository: ignite Updated Branches: refs/heads/master 6771638a5 -> 2311de477
ignite-1127 Query with result size more then one page doesn't increase Query executions count metric - Fixes #23. Signed-off-by: S.Vladykin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2311de47 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2311de47 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2311de47 Branch: refs/heads/master Commit: 2311de4777bff3a6f97904e547cc028d6ea1e51f Parents: 6771638 Author: agura <[email protected]> Authored: Wed Sep 9 16:05:56 2015 +0300 Committer: S.Vladykin <[email protected]> Committed: Wed Sep 9 16:05:56 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 27 +- .../cache/query/GridCacheLocalQueryFuture.java | 5 +- .../cache/query/GridCacheQueryAdapter.java | 43 +-- .../query/GridCacheQueryFutureAdapter.java | 9 +- .../cache/query/GridCacheQueryManager.java | 11 +- .../query/GridCacheQueryMetricsAdapter.java | 125 +++++---- .../processors/query/GridQueryProcessor.java | 50 ++-- .../CacheAbstractQueryMetricsSelfTest.java | 279 +++++++++---------- 8 files changed, 284 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 7c88b98..ce0cdd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -439,7 +440,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Cache.Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { + private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp) + throws IgniteCheckedException { final CacheQuery<Map.Entry<K,V>> qry; final CacheQueryFuture<Map.Entry<K,V>> fut; @@ -454,7 +456,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = qry.execute(); + fut = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { + return qry.execute(); + } + }, false); } else if (filter instanceof TextQuery) { TextQuery p = (TextQuery)filter; @@ -464,7 +471,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = qry.execute(); + fut = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { + return qry.execute(); + } + }, false); } else if (filter instanceof SpiQuery) { qry = ctx.queries().createSpiQuery(isKeepPortable); @@ -472,7 +484,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = qry.execute(((SpiQuery)filter).getArgs()); + fut = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { + @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { + return qry.execute(((SpiQuery)filter).getArgs()); + } + }, false); } else { if (filter instanceof SqlFieldsQuery) @@ -619,7 +636,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } catch (Exception e) { if (e instanceof CacheException) - throw e; + throw (CacheException)e; throw new CacheException(e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java index 91fc194..46af18a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java @@ -46,7 +46,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap protected GridCacheLocalQueryFuture(GridCacheContext<K, V> ctx, GridCacheQueryBean qry) { super(ctx, qry, true); - run = new LocalQueryRunnable<>(); + run = new LocalQueryRunnable(); } /** @@ -78,7 +78,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap } /** */ - private class LocalQueryRunnable<K, V, R> implements GridPlainRunnable { + private class LocalQueryRunnable implements GridPlainRunnable { /** {@inheritDoc} */ @Override public void run() { try { @@ -101,7 +101,6 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap * @return Query info. * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings({"unchecked"}) private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException { GridCacheQueryBean qry = query(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index a016037..3ac5746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -371,6 +371,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * @return Key-value filter. */ + @SuppressWarnings("unchecked") @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() { return (IgniteBiPredicate<K, V>)filter; } @@ -396,8 +397,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param startTime Start time. * @param duration Duration. */ - public void onExecuted(Object res, Throwable err, long startTime, long duration) { - GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log); + public void onCompleted(Object res, Throwable err, long startTime, long duration) { + GridQueryProcessor.onCompleted(cctx, res, err, startTime, duration, log); } /** {@inheritDoc} */ @@ -431,7 +432,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param args Arguments. * @return Future. */ - @SuppressWarnings("IfMayBeConditional") + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { Collection<ClusterNode> nodes; @@ -440,13 +441,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { nodes = nodes(); } catch (IgniteCheckedException e) { - return queryErrorFuture(cctx, e, log); + return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e); } cctx.checkSecurity(SecurityPermission.CACHE_READ); if (nodes.isEmpty()) - return queryErrorFuture(cctx, new ClusterGroupEmptyCheckedException(), log); + return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException()); if (log.isDebugEnabled()) log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); @@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { cctx.deploy().registerClasses(args); } catch (IgniteCheckedException e) { - return queryErrorFuture(cctx, e, log); + return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e); } } @@ -488,6 +489,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private Collection<ClusterNode> nodes() throws IgniteCheckedException { CacheMode cacheMode = cctx.config().getCacheMode(); + Integer part = partition(); + switch (cacheMode) { case LOCAL: if (prj != null) @@ -495,21 +498,21 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { "(only local node will be queried): " + this); if (type == SCAN && cctx.config().getCacheMode() == LOCAL && - partition() != null && partition() >= cctx.affinity().partitions()) - throw new IgniteCheckedException("Invalid partition number: " + partition()); + part != null && part >= cctx.affinity().partitions()) + throw new IgniteCheckedException("Invalid partition number: " + part); return Collections.singletonList(cctx.localNode()); case REPLICATED: - if (prj != null || partition() != null) - return nodes(cctx, prj, partition()); + if (prj != null || part != null) + return nodes(cctx, prj, part); return cctx.affinityNode() ? Collections.singletonList(cctx.localNode()) : - Collections.singletonList(F.rand(nodes(cctx, null, partition()))); + Collections.singletonList(F.rand(nodes(cctx, null, null))); case PARTITIONED: - return nodes(cctx, prj, partition()); + return nodes(cctx, prj, part); default: throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); @@ -537,7 +540,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return F.view(affNodes, new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { - return cctx.discovery().cacheAffinityNode(n, cctx.name()) && (prj == null || prj.node(n.id()) != null) && (part == null || owners.contains(n)); @@ -545,21 +547,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { }); } - /** - * @param cctx Cache context. - * @param e Exception. - * @param log Logger. - */ - private static <T> GridCacheQueryErrorFuture<T> queryErrorFuture(GridCacheContext<?, ?> cctx, - Exception e, IgniteLogger log) { - - GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics(); - - GridQueryProcessor.onExecuted(cctx, metrics, null, e, 0, 0, log); - - return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheQueryAdapter.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index 9a83ce9..ad9ee39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -155,7 +155,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda @Override public boolean onDone(Collection<R> res, Throwable err) { cctx.time().removeTimeoutObject(this); - qry.query().onExecuted(res, err, startTime(), duration()); + qry.query().onCompleted(res, err, startTime(), duration()); return super.onDone(res, err); } @@ -413,11 +413,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda } } } - catch (Error e) { - onPageError(nodeId, e); - - throw e; - } catch (Throwable e) { onPageError(nodeId, e); @@ -446,6 +441,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda * @param col Collection. * @return Collection with masked {@code null} values. */ + @SuppressWarnings("unchecked") private Collection<Object> maskNulls(Collection<Object> col) { assert col != null; @@ -460,6 +456,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda * @param col Collection. * @return Collection with unmasked {@code null} values. */ + @SuppressWarnings("unchecked") private Collection<Object> unmaskNulls(Collection<Object> col) { assert col != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 2041464..1d934d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1875,11 +1875,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * @param fail {@code true} if execution failed. + */ + public void onExecuted(boolean fail) { + metrics.onQueryExecute(fail); + } + + /** * @param duration Execution duration. * @param fail {@code true} if execution failed. */ - public void onMetricsUpdate(long duration, boolean fail) { - metrics.onQueryExecute(duration, fail); + public void onCompleted(long duration, boolean fail) { + metrics.onQueryCompleted(duration, fail); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java index fe219a9..1928ea5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.apache.ignite.cache.query.QueryMetrics; +import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jsr166.LongAdder8; /** * Adapter for {@link QueryMetrics}. @@ -32,79 +34,97 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl private static final long serialVersionUID = 0L; /** Minimum time of execution. */ - private volatile long minTime; + private final GridAtomicLong minTime = new GridAtomicLong(); /** Maximum time of execution. */ - private volatile long maxTime; + private final GridAtomicLong maxTime = new GridAtomicLong(); - /** Average time of execution. */ - private volatile double avgTime; + /** Sum of execution time for all completed queries. */ + private final LongAdder8 sumTime = new LongAdder8(); - /** Number of hits. */ - private volatile int execs; + /** Average time of execution. + * If doesn't equal zero then this metrics set is copy from remote node and doesn't actually update. + */ + private double avgTime; - /** Number of fails. */ - private volatile int fails; + /** Number of executions. */ + private final LongAdder8 execs = new LongAdder8(); - /** Whether query was executed at least once. */ - private boolean executed; + /** Number of completed executions. */ + private final LongAdder8 completed = new LongAdder8(); - /** Mutex. */ - private final Object mux = new Object(); + /** Number of fails. */ + private final LongAdder8 fails = new LongAdder8(); /** {@inheritDoc} */ @Override public long minimumTime() { - return minTime; + return minTime.get(); } /** {@inheritDoc} */ @Override public long maximumTime() { - return maxTime; + return maxTime.get(); } /** {@inheritDoc} */ @Override public double averageTime() { - return avgTime; + if (avgTime > 0) + return avgTime; + else { + long val = completed.sum(); + + return val > 0 ? sumTime.sum() / val : 0; + } } /** {@inheritDoc} */ @Override public int executions() { - return execs; + return execs.intValue(); + } + + /** + * Gets total number of completed executions of query. + * This value is actual only for local node. + * + * @return Number of completed executions. + */ + public int completedExecutions() { + return completed.intValue(); } /** {@inheritDoc} */ @Override public int fails() { - return fails; + return fails.intValue(); } /** * Callback for query execution. * - * @param duration Duration of queue execution. * @param fail {@code True} query executed unsuccessfully {@code false} otherwise. */ - public void onQueryExecute(long duration, boolean fail) { - synchronized (mux) { - if (!executed) { - minTime = duration; - maxTime = duration; - - executed = true; - } - else { - if (minTime > duration) - minTime = duration; + public void onQueryExecute(boolean fail) { + execs.increment(); - if (maxTime < duration) - maxTime = duration; - } + if (fail) + fails.increment(); + } - execs++; + /** + * Callback for completion of query execution. + * + * @param duration Duration of queue execution. + * @param fail {@code True} query executed unsuccessfully {@code false} otherwise. + */ + public void onQueryCompleted(long duration, boolean fail) { + minTime.setIfLess(duration); + maxTime.setIfGreater(duration); - if (fail) - fails++; + if (fail) + fails.increment(); + else { + completed.increment(); - avgTime = (avgTime * (execs - 1) + duration) / execs; + sumTime.add(duration); } } @@ -116,33 +136,34 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl public GridCacheQueryMetricsAdapter copy() { GridCacheQueryMetricsAdapter m = new GridCacheQueryMetricsAdapter(); - synchronized (mux) { - m.fails = fails; - m.minTime = minTime; - m.maxTime = maxTime; - m.execs = execs; - m.avgTime = avgTime; - } + // Not synchronized because accuracy isn't critical. + m.fails.add(fails.sum()); + m.minTime.set(minTime.get()); + m.maxTime.set(maxTime.get()); + m.execs.add(execs.sum()); + m.completed.add(completed.sum()); + m.sumTime.add(sumTime.sum()); + m.avgTime = avgTime; return m; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(minTime); - out.writeLong(maxTime); - out.writeDouble(avgTime); - out.writeInt(execs); - out.writeInt(fails); + out.writeLong(minTime.get()); + out.writeLong(maxTime.get()); + out.writeDouble(averageTime()); + out.writeInt(execs.intValue()); + out.writeInt(fails.intValue()); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - minTime = in.readLong(); - maxTime = in.readLong(); + minTime.set(in.readLong()); + maxTime.set(in.readLong()); avgTime = in.readDouble(); - execs = in.readInt(); - fails = in.readInt(); + execs.add(in.readInt()); + fails.add(in.readInt()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 7370996..84db145 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -56,8 +56,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -581,7 +581,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return idx.query(space, clause, params, type, filters); } - }); + }, false); } finally { busyLock.leaveBusy(); @@ -609,7 +609,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { qry, cctx.keepPortable()); } - }); + }, false); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -635,7 +635,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException { return idx.queryTwoStep(cctx, qry); } - }); + }, true); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -661,7 +661,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException { return idx.queryTwoStep(cctx, qry); } - }); + }, false); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -731,7 +731,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } }; } - }); + }, false); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -803,7 +803,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return cursor; } - }); + }, true); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -818,7 +818,6 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. */ - @SuppressWarnings("unchecked") public void remove(String space, CacheObject key, CacheObject val) throws IgniteCheckedException { assert key != null; @@ -904,7 +903,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { type, filters); } - }); + }, false); } finally { busyLock.leaveBusy(); @@ -933,7 +932,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException { return idx.queryFields(space, clause, params, filters); } - }); + }, false); } finally { busyLock.leaveBusy(); @@ -1479,10 +1478,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. * @param clo Closure. + * @param complete Complete. */ - private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo) + public <R> R executeQuery(GridCacheContext<?, ?> cctx, IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException { - final long start = U.currentTimeMillis(); + final long startTime = U.currentTimeMillis(); Throwable err = null; @@ -1491,6 +1491,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { res = clo.apply(); + if (res instanceof CacheQueryFuture) { + CacheQueryFuture fut = (CacheQueryFuture) res; + + err = fut.error(); + } + return res; } catch (GridClosureException e) { @@ -1504,34 +1510,30 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IgniteCheckedException(e); } finally { - GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics(); + cctx.queries().onExecuted(err != null); - onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log); + if (complete && err == null) + onCompleted(cctx, res, null, startTime, U.currentTimeMillis() - startTime, log); } } /** * @param cctx Cctx. - * @param metrics Metrics. * @param res Result. * @param err Err. * @param startTime Start time. * @param duration Duration. * @param log Logger. */ - public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics, - Object res, Throwable err, long startTime, long duration, IgniteLogger log) { + public static void onCompleted(GridCacheContext<?, ?> cctx, Object res, Throwable err, + long startTime, long duration, IgniteLogger log) { boolean fail = err != null; - // Update own metrics. - metrics.onQueryExecute(duration, fail); - - // Update metrics in query manager. - cctx.queries().onMetricsUpdate(duration, fail); + cctx.queries().onCompleted(duration, fail); if (log.isTraceEnabled()) - log.trace("Query execution finished [startTime=" + startTime + - ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']'); + log.trace("Query execution completed [startTime=" + startTime + + ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']'); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2311de47/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java index a082abf..28eef90 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java @@ -19,14 +19,18 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.query.QueryMetrics; +import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -47,6 +51,16 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { startGridsMultiThreaded(gridCnt); + + IgniteCache<String, Integer> cacheA = grid(0).cache("A"); + + for (int i = 0; i < 100; i++) + cacheA.put(String.valueOf(i), i); + + IgniteCache<String, Integer> cacheB = grid(0).cache("B"); + + for (int i = 0; i < 100; i++) + cacheB.put(String.valueOf(i), i); } /** {@inheritDoc} */ @@ -84,44 +98,30 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra } /** - * Test metrics for SQL queries. + * Test metrics for SQL fields queries. * * @throws Exception In case of error. */ public void testSqlFieldsQueryMetrics() throws Exception { IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - // Execute query. SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer"); - cache.query(qry).getAll(); - - QueryMetrics m = cache.queryMetrics(); - - assert m != null; - - info("Metrics: " + m); - - assertEquals(1, m.executions()); - assertEquals(0, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); - - // Execute again with the same parameters. - cache.query(qry).getAll(); - - m = cache.queryMetrics(); + testQueryMetrics(cache, qry); + } - assert m != null; + /** + * Test metrics for SQL fields queries. + * + * @throws Exception In case of error. + */ + public void testSqlFieldsQueryNotFullyFetchedMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - info("Metrics: " + m); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from Integer"); + qry.setPageSize(10); - assertEquals(2, m.executions()); - assertEquals(0, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + testQueryNotFullyFetchedMetrics(cache, qry, false); } /** @@ -132,47 +132,22 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra public void testSqlFieldsQueryFailedMetrics() throws Exception { IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - // Execute query. SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN"); - try { - cache.query(qry).getAll(); - } - catch (Exception e) { - // No-op. - } - - QueryMetrics m = cache.queryMetrics(); - - assert m != null; - - info("Metrics: " + m); - - assertEquals(1, m.executions()); - assertEquals(1, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); - - // Execute again with the same parameters. - try { - cache.query(qry).getAll(); - } - catch (Exception e) { - // No-op. - } - - m = cache.queryMetrics(); + testQueryFailedMetrics(cache, qry); + } - assert m != null; + /** + * Test metrics for Scan queries. + * + * @throws Exception In case of error. + */ + public void testScanQueryMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - info("Metrics: " + m); + ScanQuery<String, Integer> qry = new ScanQuery<>(); - assertEquals(2, m.executions()); - assertEquals(2, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + testQueryMetrics(cache, qry); } /** @@ -180,128 +155,136 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra * * @throws Exception In case of error. */ - public void testScanQueryMetrics() throws Exception { + public void testScanQueryNotFullyFetchedMetrics() throws Exception { IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - // Execute query. ScanQuery<String, Integer> qry = new ScanQuery<>(); + qry.setPageSize(10); - cache.query(qry).getAll(); + testQueryNotFullyFetchedMetrics(cache, qry, true); + } - QueryMetrics m = cache.queryMetrics(); + /** + * Test metrics for failed Scan queries. + * + * @throws Exception In case of error. + */ + public void testScanQueryFailedMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - assert m != null; + ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE); - info("Metrics: " + m); + testQueryFailedMetrics(cache, qry); + } - assertEquals(1, m.executions()); - assertEquals(0, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + /** + * Test metrics for SQL cross cache queries. + * + * @throws Exception In case of error. + */ + public void testSqlCrossCacheQueryMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - // Execute again with the same parameters. - cache.query(qry).getAll(); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer"); - m = cache.queryMetrics(); + testQueryMetrics(cache, qry); + } - assert m != null; + /** + * Test metrics for SQL cross cache queries. + * + * @throws Exception In case of error. + */ + public void testSqlCrossCacheQueryNotFullyFetchedMetrics() throws Exception { + IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - info("Metrics: " + m); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer"); + qry.setPageSize(10); - assertEquals(2, m.executions()); - assertEquals(0, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + testQueryNotFullyFetchedMetrics(cache, qry, false); } /** - * Test metrics for failed Scan queries. + * Test metrics for failed SQL cross cache queries. * * @throws Exception In case of error. */ - public void testScanQueryFailedMetrics() throws Exception { + public void testSqlCrossCacheQueryFailedMetrics() throws Exception { IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - // Execute query. - ScanQuery<String, Integer> qry = new ScanQuery<>(Integer.MAX_VALUE); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer"); - try { - cache.query(qry).getAll(); - } - catch (Exception e) { - // No-op. - } + testQueryFailedMetrics(cache, qry); + } - QueryMetrics m = cache.queryMetrics(); + /** + * @param cache Cache. + * @param qry Query. + */ + private void testQueryMetrics(IgniteCache<String, Integer> cache, Query qry) { + cache.query(qry).getAll(); - assert m != null; + GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(1, m.executions()); - assertEquals(1, m.fails()); + assertEquals(1, m.completedExecutions()); + assertEquals(0, m.fails()); assertTrue(m.averageTime() >= 0); assertTrue(m.maximumTime() >= 0); assertTrue(m.minimumTime() >= 0); // Execute again with the same parameters. - try { - cache.query(qry).getAll(); - } - catch (Exception e) { - // No-op. - } - - m = cache.queryMetrics(); + cache.query(qry).getAll(); - assert m != null; + m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(2, m.executions()); - assertEquals(2, m.fails()); + assertEquals(2, m.completedExecutions()); + assertEquals(0, m.fails()); assertTrue(m.averageTime() >= 0); assertTrue(m.maximumTime() >= 0); assertTrue(m.minimumTime() >= 0); } /** - * Test metrics for SQL cross cache queries. - * - * @throws Exception In case of error. + * @param cache Cache. + * @param qry Query. + * @param waitingForCompletion Waiting for query completion. */ - public void testSqlCrossCacheQueryMetrics() throws Exception { - IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); + private void testQueryNotFullyFetchedMetrics(IgniteCache<String, Integer> cache, Query qry, + boolean waitingForCompletion) throws IgniteInterruptedCheckedException { + cache.query(qry).iterator().next(); - // Execute query. - SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Integer"); - - cache.query(qry).getAll(); + if (waitingForCompletion) + waitingForCompletion(cache, 1); - QueryMetrics m = cache.queryMetrics(); - - assert m != null; + GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(1, m.executions()); + assertEquals(1, m.completedExecutions()); assertEquals(0, m.fails()); assertTrue(m.averageTime() >= 0); assertTrue(m.maximumTime() >= 0); assertTrue(m.minimumTime() >= 0); // Execute again with the same parameters. - cache.query(qry).getAll(); + cache.query(qry).iterator().next(); - m = cache.queryMetrics(); + if (waitingForCompletion) + waitingForCompletion(cache, 2); - assert m != null; + m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(2, m.executions()); + assertEquals(2, m.completedExecutions()); assertEquals(0, m.fails()); assertTrue(m.averageTime() >= 0); assertTrue(m.maximumTime() >= 0); @@ -309,34 +292,27 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra } /** - * Test metrics for failed SQL cross cache queries. - * - * @throws Exception In case of error. + * @param cache Cache. + * @param qry Query. */ - public void testSqlCrossCacheQueryFailedMetrics() throws Exception { - IgniteCache<String, Integer> cache = grid(0).context().cache().jcache("A"); - - // Execute query. - SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Integer"); - + private void testQueryFailedMetrics(IgniteCache<String, Integer> cache, Query qry) { try { cache.query(qry).getAll(); } catch (Exception e) { - // No-op + // No-op. } - QueryMetrics m = cache.queryMetrics(); - - assert m != null; + GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(1, m.executions()); + assertEquals(0, m.completedExecutions()); assertEquals(1, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + assertTrue(m.averageTime() == 0); + assertTrue(m.maximumTime() == 0); + assertTrue(m.minimumTime() == 0); // Execute again with the same parameters. try { @@ -346,16 +322,29 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra // No-op. } - m = cache.queryMetrics(); - - assert m != null; + m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); info("Metrics: " + m); assertEquals(2, m.executions()); + assertEquals(0, m.completedExecutions()); assertEquals(2, m.fails()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + assertTrue(m.averageTime() == 0); + assertTrue(m.maximumTime() == 0); + assertTrue(m.minimumTime() == 0); + } + + /** + * @param cache Cache. + * @param exp Expected. + */ + private static void waitingForCompletion(final IgniteCache<String, Integer> cache, + final int exp) throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); + return m.completedExecutions() == exp; + } + }, 5000); } } \ No newline at end of file
