#ignite-286: refactoring.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9373e43d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9373e43d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9373e43d Branch: refs/heads/ignite-286 Commit: 9373e43da7698d57d3e241780ec1cddb17882f47 Parents: 529db75 Author: ivasilinets <[email protected]> Authored: Mon Apr 27 17:38:47 2015 +0300 Committer: ivasilinets <[email protected]> Committed: Mon Apr 27 17:38:47 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheSwapManager.java | 28 ++++-- .../cache/query/GridCacheQueryManager.java | 98 +++----------------- .../ignite/internal/util/IgniteUtils.java | 52 +++++++++++ 3 files changed, 84 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373e43d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 6fd5d64..8b33c19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return rawOffHeapIterator(true, true); if (swapEnabled() && !offHeapEnabled()) - return rawSwapIterator(); + return rawSwapIterator(true, true); // Both, swap and off-heap are enabled. return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() { @@ -1254,7 +1254,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapFlag) { offheapFlag = false; - it = rawSwapIterator(); + it = rawSwapIterator(true, true); if (!it.hasNext()) { it.close(); @@ -1604,13 +1604,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ private int[] partitions(boolean primary, boolean backup) { if (primary && backup) - return cctx.grid().affinity(cctx.name()).allPartitions(cctx.grid().localNode()); + return cctx.grid().affinity(cctx.name()).allPartitions(cctx.localNode()); if (primary) - return cctx.grid().affinity(cctx.name()).primaryPartitions(cctx.grid().localNode()); + return cctx.grid().affinity(cctx.name()).primaryPartitions(cctx.localNode()); if (backup) - return cctx.grid().affinity(cctx.name()).backupPartitions(cctx.grid().localNode()); + return cctx.grid().affinity(cctx.name()).backupPartitions(cctx.localNode()); return new int[0]; } @@ -1635,15 +1635,27 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @return Raw off-heap iterator. + * @param primary Include primaries. + * @param backup Include backups. * @throws IgniteCheckedException If failed. */ - public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator() throws IgniteCheckedException { - if (!swapEnabled) + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup) throws IgniteCheckedException { + if (!swapEnabled || (!primary && !backup)) return new GridEmptyCloseableIterator<>(); checkIteratorQueue(); - return swapMgr.rawIterator(spaceName); + if (primary && backup) + return swapMgr.rawIterator(spaceName); + + int[] parts = partitions(primary, backup); + + List<GridIterator<Map.Entry<byte[], byte[]>>> iterators = new ArrayList<>(); + + for (int i = 0; i < parts.length; ++i) + iterators.add(swapMgr.rawIterator(spaceName, parts[i])); + + return U.compoudIterator(iterators); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373e43d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 1aac877..118e15f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -774,18 +774,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final ExpiryPolicy plc = cctx.expiry(); - final boolean includeBackups = qry.includeBackups() || cctx.isReplicated(); - - Set<Integer> parts = includeBackups ? null : - cctx.affinity().primaryPartitions(cctx.nodeId(), cctx.affinity().affinityTopologyVersion()); + final boolean backups = qry.includeBackups() || cctx.isReplicated(); final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { private IgniteBiTuple<K, V> next; private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = includeBackups ? - prj.keySet().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); { advance(); @@ -881,12 +877,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte iters.add(heapIt); if (cctx.isOffHeapEnabled()) - iters.add(offheapIterator(qry, includeBackups)); + iters.add(offheapIterator(qry, backups)); if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry, parts)); + iters.add(swapIterator(qry, backups)); - it = new CompoundIterator<>(iters); + it = U.compoudIterator(iters); } else it = heapIt; @@ -918,47 +914,38 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param qry Query. - * @param parts Collection of partitions. + * @param backups Include backups. * @return Swap iterator. * @throws IgniteCheckedException If failed. */ - private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, Collection<Integer> parts) + private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, boolean backups) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); Iterator<Map.Entry<byte[], byte[]>> it; - if (parts == null) - it = cctx.swap().rawSwapIterator(); - else { - List<GridIterator<Map.Entry<byte[], byte[]>>> partIts = new ArrayList<>(); - - for (Integer part : parts) - partIts.add(cctx.swap().rawSwapIterator(part)); - - it = new CompoundIterator(partIts); - } + it = cctx.swap().rawSwapIterator(true, backups); return scanIterator(it, filter, qry.keepPortable()); } /** * @param qry Query. - * @param includeBackups IncludeBackups. + * @param backups Include backups. * @return Offheap iterator. * @throws IgniteCheckedException If failed. */ - private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean includeBackups) + private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, boolean backups) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); - return cctx.swap().rawOffHeapIterator(c, true, includeBackups); + return cctx.swap().rawOffHeapIterator(c, true, backups); } else { - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, includeBackups); + Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups); return scanIterator(it, filter, qry.keepPortable()); } @@ -2528,67 +2515,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * */ - private static class CompoundIterator<T> extends GridIteratorAdapter<T> { - /** */ - private static final long serialVersionUID = 4585888051556166304L; - - /** */ - private final List<GridIterator<T>> iters; - - /** */ - private int idx; - - /** */ - private GridIterator<T> iter; - - /** - * @param iters Iterators. - */ - private CompoundIterator(List<GridIterator<T>> iters) { - if (iters.isEmpty()) - throw new IllegalArgumentException(); - - this.iters = iters; - - iter = F.first(iters); - } - - /** {@inheritDoc} */ - @Override public boolean hasNextX() throws IgniteCheckedException { - if (iter.hasNextX()) - return true; - - idx++; - - while(idx < iters.size()) { - iter = iters.get(idx); - - if (iter.hasNextX()) - return true; - - idx++; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public T nextX() throws IgniteCheckedException { - if (!hasNextX()) - throw new NoSuchElementException(); - - return iter.nextX(); - } - - /** {@inheritDoc} */ - @Override public void removeX() throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - } - - /** - * - */ private class GridCacheScanSwapEntry implements Cache.Entry<K, V> { /** */ private final AbstractLazySwapEntry e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9373e43d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index cb56650..3bc7ed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9017,4 +9017,56 @@ public abstract class IgniteUtils { return hasShmem; } + + public static <T> GridCloseableIteratorAdapter<T> compoudIterator(final List<GridIterator<T>> allIters) { + return new GridCloseableIteratorAdapter<T> () { + /** */ + private static final long serialVersionUID = 4585888051556166304L; + + /** */ + private final List<GridIterator<T>> iters; + + /** */ + private int idx; + + /** */ + private GridIterator<T> iter; + + { + if (allIters.isEmpty()) + throw new IllegalArgumentException(); + + iters = allIters; + + iter = F.first(allIters); + } + + /** {@inheritDoc} */ + @Override public boolean onHasNext() throws IgniteCheckedException { + if (iter.hasNextX()) + return true; + + idx++; + + while(idx < iters.size()) { + iter = iters.get(idx); + + if (iter.hasNextX()) + return true; + + idx++; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public T onNext() throws IgniteCheckedException { + if (!hasNextX()) + throw new NoSuchElementException(); + + return iter.nextX(); + } + }; + } }
