IGNITE-3227 - Added method to get partition size
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8af30781 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8af30781 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8af30781 Branch: refs/heads/ignite-3414 Commit: 8af307819dba9f9fea4946cb09df01c4ef146f8a Parents: 5b49dad Author: Alexey Goncharuk <[email protected]> Authored: Thu Jul 7 15:58:50 2016 -0700 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Jul 7 15:58:50 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 25 + .../processors/cache/GridCacheAdapter.java | 215 ++++++++- .../processors/cache/GridCacheProxyImpl.java | 36 ++ .../processors/cache/IgniteCacheProxy.java | 40 ++ .../processors/cache/IgniteInternalCache.java | 23 + .../cache/IgniteCacheAtomicPeekModesTest.java | 2 +- .../cache/IgniteCachePeekModesAbstractTest.java | 463 ++++++++++++++++++- .../multijvm/IgniteCacheProcessProxy.java | 46 +- 8 files changed, 846 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 3af2c44..8cefb4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -370,6 +370,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public long sizeLong(CachePeekMode... peekModes) throws CacheException; /** + * Gets the number of all entries cached in a partition as a long value. By default, if {@code peekModes} value + * isn't defined, only size of primary copies across all nodes will be returned. This behavior is identical to + * calling this method with {@link CachePeekMode#PRIMARY} peek mode. + * <p> + * NOTE: this operation is distributed and will query all participating nodes for their partition cache sizes. + * + * @param partition partition. + * @param peekModes Optional peek modes. If not provided, then total partition cache size is returned. + * @return Partion cache size across all nodes. + */ + @IgniteAsyncSupported + public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException; + + /** * Gets the number of all entries cached on this node. By default, if {@code peekModes} value isn't defined, * only size of primary copies will be returned. This behavior is identical to calling this method with * {@link CachePeekMode#PRIMARY} peek mode. @@ -390,6 +404,17 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public long localSizeLong(CachePeekMode... peekModes); /** + * Gets the number of all entries cached on this node for the partition as a long value. By default, if {@code peekModes} value isn't + * defined, only size of primary copies will be returned. This behavior is identical to calling this method with + * {@link CachePeekMode#PRIMARY} peek mode. + * + * @param partition partition. + * @param peekModes Optional peek modes. If not provided, then total cache size is returned. + * @return Cache size on this node. + */ + public long localSizeLong(int partition, CachePeekMode... peekModes); + + /** * @param map Map containing keys and entry processors to be applied to values. * @param args Additional arguments to pass to the {@link EntryProcessor}. * @return The map of {@link EntryProcessorResult}s of the processing per key, http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 028f516..55bd81d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -3965,6 +3966,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException { + if (isLocal()) + return localSizeLong(partition, peekModes); + + return sizeLongAsync(partition, peekModes).get(); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) { assert peekModes != null; @@ -4007,6 +4016,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Long> sizeLongAsync(final int part, final CachePeekMode[] peekModes) { + assert peekModes != null; + + final PeekModes modes = parsePeekModes(peekModes, true); + + IgniteClusterEx cluster = ctx.grid().cluster(); + final GridCacheAffinityManager aff = ctx.affinity(); + final AffinityTopologyVersion topVer = aff.affinityTopologyVersion(); + + ClusterGroup grp = cluster.forDataNodes(name()); + + Collection<ClusterNode> nodes = grp.forPredicate(new IgnitePredicate<ClusterNode>() { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return clusterNode.version().compareTo(PartitionSizeLongTask.SINCE_VER) >= 0 && + ((modes.primary && aff.primary(clusterNode, part, topVer)) || + (modes.backup && aff.backup(clusterNode, part, topVer))); + } + }).nodes(); + + if (nodes.isEmpty()) + return new GridFinishedFuture<>(0L); + + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.kernalContext().task().execute( + new PartitionSizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, part), null); + } + + /** {@inheritDoc} */ @Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException { return (int)localSizeLong(peekModes); } @@ -4060,6 +4099,50 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException { + PeekModes modes = parsePeekModes(peekModes, true); + + long size = 0; + + AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); + + // Swap and offheap are disabled for near cache. + GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); + + if (ctx.isLocal()){ + modes.primary = true; + modes.backup = true; + + if (modes.heap) + size += size(); + + if (modes.swap) + size += swapMgr.swapEntriesCount(0); + + if (modes.offheap) + size += swapMgr.offheapEntriesCount(0); + } + else { + GridDhtLocalPartition dhtPart = ctx.topology().localPartition(part, topVer, false); + + if (dhtPart != null) { + if (modes.primary && dhtPart.primary(topVer) || modes.backup && dhtPart.backup(topVer)) { + if (modes.heap) + size += dhtPart.publicSize(); + + if (modes.swap) + size += swapMgr.swapEntriesCount(part); + + if (modes.offheap) + size += swapMgr.offheapEntriesCount(part); + } + } + } + + return size; + } + + /** {@inheritDoc} */ @Override public int size() { return map.publicSize(); } @@ -5637,6 +5720,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Internal callable for partition size calculation. + */ + @GridInternal + private static class PartitionSizeLongJob extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Partition. */ + private final int partition; + + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param peekModes Cache peek modes. + * @param partition partition. + */ + private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, int partition) { + super(cacheName, topVer); + + this.peekModes = peekModes; + this.partition = partition; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + if (cache == null) + return 0; + + try { + return cache.localSizeLong(partition, peekModes); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(PartitionSizeLongJob.class, this); + } + } + + /** * Internal callable for global size calculation. */ @GridInternal @@ -6610,7 +6739,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - public SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + private SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { this.cacheName = cacheName; this.topVer = topVer; this.peekModes = peekModes; @@ -6655,6 +6784,90 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Partition Size Long task. + */ + private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.30"); + + /** Partition */ + private final int partition; + + /** Cache name. */ + private final String cacheName; + + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; + + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param peekModes Cache peek modes. + * @param partition partition. + */ + private PartitionSizeLongTask( + String cacheName, + AffinityTopologyVersion topVer, + CachePeekMode[] peekModes, + int partition + ) { + this.cacheName = cacheName; + this.topVer = topVer; + this.peekModes = peekModes; + this.partition = partition; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map( + List<ClusterNode> subgrid, + @Nullable Object arg + ) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) + jobs.put(new PartitionSizeLongJob(cacheName, topVer, peekModes, partition), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + IgniteException e = res.getException(); + + if (e != null) { + if (e instanceof ClusterTopologyException) + return ComputeJobResultPolicy.WAIT; + + throw new IgniteException("Remote job threw exception.", e); + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException { + long size = 0; + + for (ComputeJobResult res : results) { + if (res != null) { + if (res.getException() == null) + size += res.<Long>getData(); + else + throw res.getException(); + } + } + + return size; + } + } + + /** * Clear task. */ private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> { http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 99dd608..b46c4dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1483,6 +1483,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.sizeLong(partition, peekModes); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes) { CacheOperationContext prev = gate.enter(opCtx); @@ -1507,6 +1519,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.sizeLongAsync(partition, peekModes); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -1531,6 +1555,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.localSizeLong(partition, peekModes); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public int nearSize() { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 12ec8b8..92e59db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -840,6 +840,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ + @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) { + setFuture(delegate.sizeLongAsync(part, peekModes)); + + return 0; + } + else + return delegate.sizeLong(part, peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { GridCacheGateway<K, V> gate = this.gate; @@ -874,6 +897,23 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public long localSizeLong(int part, CachePeekMode... peekModes) { + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + return delegate.localSizeLong(part, peekModes); + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + finally { + onLeave(gate, prev); + } + } + + /** {@inheritDoc} */ @Override public V get(K key) { try { GridCacheGateway<K, V> gate = this.gate; http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index d155b0e..4dc9a23f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1453,6 +1453,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public long localSizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException; /** + * @param partition partition. + * @param peekModes Peek modes. + * @return Local cache size as a long value. + * @throws IgniteCheckedException If failed. + */ + public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException; + + /** * @param peekModes Peek modes. * @return Global cache size. * @throws IgniteCheckedException If failed. @@ -1467,6 +1475,14 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public long sizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException; /** + * @param partition partition + * @param peekModes Peek modes. + * @return Global cache size as a long value. + * @throws IgniteCheckedException If failed. + */ + public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException; + + /** * @param peekModes Peek modes. * @return Future. */ @@ -1479,6 +1495,13 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { public IgniteInternalFuture<Long> sizeLongAsync(CachePeekMode[] peekModes); /** + * @param partition partiton + * @param peekModes Peek modes. + * @return Future. + */ + public IgniteInternalFuture<Long> sizeLongAsync(int partition, CachePeekMode[] peekModes); + + /** * Gets size of near cache key set. This method will return count of all entries in near * cache and has O(1) complexity on base cache projection. * <p> http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java index 4270bab..8b7859a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicPeekModesTest.java @@ -48,4 +48,4 @@ public class IgniteCacheAtomicPeekModesTest extends IgniteCachePeekModesAbstract @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { return PRIMARY; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java index c27cccb..5dc059b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java @@ -28,6 +28,7 @@ import java.util.Set; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -38,6 +39,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.spi.IgniteSpiCloseableIterator; import org.apache.ignite.spi.swapspace.SwapSpaceSpi; @@ -501,6 +503,144 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra } /** + * @throws Exception If failed. + */ + public void testLocalPartitionSize() throws Exception { + if (cacheMode() != LOCAL) + return; + + awaitPartitionMapExchange(); + checkEmpty(); + int part = 0; + IgniteCache<Integer, String> cache0 = jcache(0); + + IgniteCache<Integer, String> cacheAsync0 = cache0.withAsync(); + + for (int i = 0; i < HEAP_ENTRIES; i++) { + cache0.put(i, String.valueOf(i)); + + final long size = i + 1; + + assertEquals(size, cache0.localSize()); + assertEquals(size, cache0.localSizeLong(part, PRIMARY)); + assertEquals(size, cache0.localSizeLong(part, BACKUP)); + assertEquals(size, cache0.localSizeLong(part, NEAR)); + assertEquals(size, cache0.localSizeLong(part, ALL)); + + assertEquals(size, cache0.size()); + assertEquals(size, cache0.sizeLong(part, PRIMARY)); + assertEquals(size, cache0.sizeLong(part, BACKUP)); + assertEquals(size, cache0.sizeLong(part, NEAR)); + assertEquals(size, cache0.sizeLong(part, ALL)); + + cacheAsync0.size(); + + assertEquals(size, (long) cacheAsync0.<Integer>future().get()); + + cacheAsync0.sizeLong(part, PRIMARY); + + assertEquals(size, cacheAsync0.future().get()); + } + + for (int i = 0; i < HEAP_ENTRIES; i++) { + cache0.remove(i, String.valueOf(i)); + + final int size = HEAP_ENTRIES - i - 1; + + assertEquals(size, cache0.localSize()); + assertEquals(size, cache0.localSizeLong(part, PRIMARY)); + assertEquals(size, cache0.localSizeLong(part, BACKUP)); + assertEquals(size, cache0.localSizeLong(part, NEAR)); + assertEquals(size, cache0.localSizeLong(part, ALL)); + + assertEquals(size, cache0.size()); + assertEquals(size, cache0.sizeLong(part, PRIMARY)); + assertEquals(size, cache0.sizeLong(part, BACKUP)); + assertEquals(size, cache0.sizeLong(part, NEAR)); + assertEquals(size, cache0.sizeLong(part, ALL)); + + cacheAsync0.size(); + + assertEquals(size, (long) cacheAsync0.<Integer>future().get()); + } + } + + /** + * @throws InterruptedException If failed. + */ + public void testLocalPartitionSizeFlags() throws InterruptedException { + if (cacheMode() != LOCAL) + return; + + awaitPartitionMapExchange(); + checkEmpty(); + int part = 0; + IgniteCache<Integer, String> cache0 = jcache(0); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 200; i++) { + cache0.put(i, "test_val"); + + keys.add(i); + } + + try { + int totalKeys = 200; + + T2<Integer, Integer> swapKeys = swapKeysCount(0); + + T2<Integer, Integer> offheapKeys = offheapKeysCount(0); + + int totalSwap = swapKeys.get1() + swapKeys.get2(); + int totalOffheap = offheapKeys.get1() + offheapKeys.get2(); + + log.info("Keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']'); + + assertTrue(totalSwap + totalOffheap < totalKeys); + + assertEquals(totalKeys, cache0.localSize()); + assertEquals(totalKeys, cache0.localSizeLong(part, ALL)); + + assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP)); + assertEquals(totalSwap, cache0.localSizeLong(part, SWAP)); + assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP)); + + assertEquals(totalOffheap, cache0.sizeLong(part, OFFHEAP)); + assertEquals(totalSwap, cache0.sizeLong(part, SWAP)); + assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.sizeLong(part, ONHEAP)); + + assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, PRIMARY)); + assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, PRIMARY)); + assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, PRIMARY)); + + assertEquals(totalOffheap, cache0.localSizeLong(part, OFFHEAP, BACKUP)); + assertEquals(totalSwap, cache0.localSizeLong(part, SWAP, BACKUP)); + assertEquals(totalKeys - (totalSwap + totalOffheap), cache0.localSizeLong(part, ONHEAP, BACKUP)); + } + finally { + cache0.removeAll(keys); + } + } + + /** + * @throws Exception If failed. + */ + public void testNonLocalPartitionSize() throws Exception { + if (cacheMode() == LOCAL) + return; + + awaitPartitionMapExchange(true, true); + + checkEmpty(); + + for (int i = 0; i < gridCount(); i++) { + checkPartitionSizeAffinityFilter(i); + checkPartitionSizeStorageFilter(i); + } + } + + /** * @param nodeIdx Node index. * @throws Exception If failed. */ @@ -627,6 +767,164 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra } /** + * @param nodeIdx Node index. + * @throws Exception If failed. + */ + private void checkPartitionSizeAffinityFilter(int nodeIdx) throws Exception { + IgniteCache<Integer, String> cache0 = jcache(nodeIdx); + + final int PUT_KEYS = 10; + + int part = nodeIdx; + + List<Integer> keys = null; + + try { + if (cacheMode() == REPLICATED) { + keys = backupKeys(cache0, 10, 0); + + for (Integer key : keys) + cache0.put(key, String.valueOf(key)); + + int partSize = 0; + + for (Integer key : keys){ + int keyPart = ignite(nodeIdx).affinity(null).partition(key); + if (keyPart == part) + partSize++; + } + + assertEquals(PUT_KEYS, cache0.localSize(BACKUP)); + assertEquals(PUT_KEYS, cache0.localSize(ALL)); + assertEquals(partSize, cache0.localSizeLong(part, BACKUP)); + assertEquals(partSize, cache0.localSizeLong(part, ALL)); + assertEquals(0, cache0.localSizeLong(part, PRIMARY)); + assertEquals(0, cache0.localSizeLong(part, NEAR)); + + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, String> cache = jcache(i); + assertEquals(0, cache.size(NEAR)); + assertEquals(partSize, cache.sizeLong(part, PRIMARY)); + assertEquals(partSize * (gridCount() - 1), cache.sizeLong(part, BACKUP)); + assertEquals(partSize * gridCount(), cache.sizeLong(part, PRIMARY, BACKUP)); + assertEquals(partSize * gridCount(), cache.sizeLong(part, ALL)); // Primary + backups. + } + } + else { + keys = nearKeys(cache0, PUT_KEYS, 0); + + for (Integer key : keys) + cache0.put(key, String.valueOf(key)); + + int partSize = 0; + + for (Integer key :keys){ + int keyPart = ignite(nodeIdx).affinity(null).partition(key); + if(keyPart == part) + partSize++; + } + + if (hasNearCache()) { + assertEquals(0, cache0.localSize()); + assertEquals(0, cache0.localSizeLong(part, ALL)); + assertEquals(0, cache0.localSizeLong(part, NEAR)); + + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, String> cache = jcache(i); + + assertEquals(0, cache.sizeLong(part, NEAR)); + assertEquals(partSize, cache.sizeLong(part, BACKUP)); + assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP)); + assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups + near. + } + } + else { + assertEquals(0, cache0.localSize()); + //assertEquals(partitionSize, cache0.localSizeLong(partition, ALL)); + assertEquals(0, cache0.localSizeLong(part, NEAR)); + + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, String> cache = jcache(i); + + assertEquals(0, cache.size(NEAR)); + assertEquals(partSize, cache.sizeLong(part, BACKUP)); + assertEquals(partSize * 2, cache.sizeLong(part, PRIMARY, BACKUP)); + assertEquals(partSize * 2, cache.sizeLong(part, ALL)); // Primary + backups. + } + } + + assertEquals(0, cache0.localSize(BACKUP)); + assertEquals(0, cache0.localSize(PRIMARY)); + } + + checkPrimarySize(PUT_KEYS); + + Affinity<Integer> aff = ignite(0).affinity(null); + + for (int i = 0; i < gridCount(); i++) { + if (i == nodeIdx) + continue; + + ClusterNode node = ignite(i).cluster().localNode(); + + int primary = 0; + int backups = 0; + + for (Integer key : keys) { + if (aff.isPrimary(node, key) && aff.partition(key) == part) + primary++; + else if (aff.isBackup(node, key) && aff.partition(key) == part) + backups++; + } + + IgniteCache<Integer, String> cache = jcache(i); + + assertEquals(primary, cache.localSizeLong(part, PRIMARY)); + assertEquals(backups, cache.localSizeLong(part, BACKUP)); + assertEquals(primary + backups, cache.localSizeLong(part, PRIMARY, BACKUP)); + assertEquals(primary + backups, cache.localSizeLong(part, BACKUP, PRIMARY)); + assertEquals(primary + backups, cache.localSizeLong(part, ALL)); + } + + cache0.remove(keys.get(0)); + + keys.remove(0); + + checkPrimarySize(PUT_KEYS - 1); + + int primary = 0; + int backups = 0; + + ClusterNode node = ignite(nodeIdx).cluster().localNode(); + + for (Integer key : keys) { + if (aff.isPrimary(node, key) && aff.partition(key) == part) + primary++; + else if (aff.isBackup(node, key) && aff.partition(key) == part) + backups++; + } + + if (cacheMode() == REPLICATED) { + assertEquals(primary+backups, cache0.localSizeLong(part, ALL)); + assertEquals(primary, cache0.localSizeLong(part, PRIMARY)); + assertEquals(backups, cache0.localSizeLong(part, BACKUP)); + } + else { + if (hasNearCache()) + assertEquals(0, cache0.localSizeLong(part, ALL)); + else + assertEquals(0, cache0.localSizeLong(part, ALL)); + } + } + finally { + if (keys != null) + cache0.removeAll(new HashSet<>(keys)); + } + + checkEmpty(); + } + + /** * Checks size is zero. */ private void checkEmpty() { @@ -695,6 +993,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra /** * @param nodeIdx Node index. + * @param part Cache partition + * @return Tuple with number of primary and backup keys (one or both will be zero). + */ + private T2<Integer, Integer> swapKeysCount(int nodeIdx, int part) throws IgniteCheckedException { + GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context(); + // Swap and offheap are disabled for near cache. + GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); + //First count entries... + int cnt = (int)swapMgr.swapEntriesCount(part); + + GridCacheAffinityManager affinity = ctx.affinity(); + AffinityTopologyVersion topVer = affinity.affinityTopologyVersion(); + + //And then find out whether they are primary or backup ones. + int primaryCnt = 0; + int backupCnt = 0; + if (affinity.primary(ctx.localNode(), part, topVer)) + primaryCnt = cnt; + else if (affinity.backup(ctx.localNode(), part, topVer)) + backupCnt = cnt; + return new T2<>(primaryCnt, backupCnt); + } + + /** + * @param nodeIdx Node index. * @return Tuple with primary and backup keys. */ private T2<List<Integer>, List<Integer>> offheapKeys(int nodeIdx) { @@ -742,6 +1065,31 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra /** * @param nodeIdx Node index. + * @param part Cache partition. + * @return Tuple with number of primary and backup keys (one or both will be zero). + */ + private T2<Integer, Integer> offheapKeysCount(int nodeIdx, int part) throws IgniteCheckedException { + GridCacheContext ctx = ((IgniteEx)ignite(nodeIdx)).context().cache().internalCache().context(); + // Swap and offheap are disabled for near cache. + GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); + //First count entries... + int cnt = (int)swapMgr.offheapEntriesCount(part); + + GridCacheAffinityManager affinity = ctx.affinity(); + AffinityTopologyVersion topVer = affinity.affinityTopologyVersion(); + + //And then find out whether they are primary or backup ones. + int primaryCnt = 0; + int backupCnt = 0; + if (affinity.primary(ctx.localNode(), part, topVer)) + primaryCnt = cnt; + else if (affinity.backup(ctx.localNode(), part, topVer)) + backupCnt = cnt; + return new T2<>(primaryCnt, backupCnt); + } + + /** + * @param nodeIdx Node index. * @throws Exception If failed. */ private void checkSizeStorageFilter(int nodeIdx) throws Exception { @@ -862,6 +1210,119 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra } /** + * @param nodeIdx Node index. + * @throws Exception If failed. + */ + private void checkPartitionSizeStorageFilter(int nodeIdx) throws Exception { + IgniteCache<Integer, String> cache0 = jcache(nodeIdx); + + int part = nodeIdx; + + List<Integer> primaryKeys = primaryKeys(cache0, 100, 10_000); + List<Integer> backupKeys = backupKeys(cache0, 100, 10_000); + + try { + final String val = "test_value"; + + for (int i = 0; i < 100; i++) { + cache0.put(primaryKeys.get(i), val); + cache0.put(backupKeys.get(i), val); + } + + + int totalKeys = 200; + + T2<Integer, Integer> swapKeys = swapKeysCount(nodeIdx, part); + + T2<Integer, Integer> offheapKeys = offheapKeysCount(nodeIdx, part); + + int totalSwap = swapKeys.get1() + swapKeys.get2(); + int totalOffheap = offheapKeys.get1() + offheapKeys.get2(); + + log.info("Local keys [total=" + totalKeys + ", offheap=" + offheapKeys + ", swap=" + swapKeys + ']'); + + assertTrue(totalSwap + totalOffheap < totalKeys); + + assertEquals(primaryKeys.size(), cache0.localSize()); + assertEquals(totalKeys, cache0.localSize(ALL)); + assertEquals(totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP)); + assertEquals(totalSwap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP)); + assertEquals((long)swapKeys.get1(), cache0.localSizeLong(part, SWAP, PRIMARY)); + assertEquals((long)swapKeys.get2(), cache0.localSizeLong(part, SWAP, BACKUP)); + + assertEquals((long)offheapKeys.get1(), cache0.localSizeLong(part, OFFHEAP, PRIMARY)); + assertEquals((long)offheapKeys.get2(), cache0.localSizeLong(part, OFFHEAP, BACKUP)); + + assertEquals(swapKeys.get1() + offheapKeys.get1(), cache0.localSizeLong(part, SWAP, OFFHEAP, PRIMARY)); + assertEquals(swapKeys.get2() + offheapKeys.get2(), cache0.localSizeLong(part, SWAP, OFFHEAP, BACKUP)); + + assertEquals(totalSwap + totalOffheap, cache0.localSizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP)); + + int globalParitionSwapPrimary = 0; + int globalPartSwapBackup = 0; + + int globalPartOffheapPrimary = 0; + int globalPartOffheapBackup = 0; + + for (int i = 0; i < gridCount(); i++) { + T2<Integer, Integer> swap = swapKeysCount(i, part); + + globalParitionSwapPrimary += swap.get1(); + globalPartSwapBackup += swap.get2(); + + T2<Integer, Integer> offheap = offheapKeysCount(i, part); + + globalPartOffheapPrimary += offheap.get1(); + globalPartOffheapBackup += offheap.get2(); + } + + int backups; + + if (cacheMode() == LOCAL) + backups = 0; + else if (cacheMode() == PARTITIONED) + backups = 1; + else // REPLICATED. + backups = gridCount() - 1; + + int globalTotal = totalKeys + totalKeys * backups; + int globalPartTotalSwap = globalParitionSwapPrimary + globalPartSwapBackup; + int globalPartTotalOffheap = globalPartOffheapPrimary + globalPartOffheapBackup; + + log.info("Global keys [total=" + globalTotal + + ", offheap=" + globalPartTotalOffheap + + ", swap=" + globalPartTotalSwap + ']'); + + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, String> cache = jcache(i); + + assertEquals(totalKeys, cache.size(PRIMARY)); + assertEquals(globalTotal, cache.size(ALL)); + assertEquals(globalTotal, cache.size(PRIMARY, BACKUP, NEAR, ONHEAP, OFFHEAP, SWAP)); + assertEquals(globalTotal, cache.size(ONHEAP, OFFHEAP, SWAP, PRIMARY, BACKUP)); + + assertEquals(globalPartTotalSwap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP)); + assertEquals(globalParitionSwapPrimary, cache.sizeLong(part, SWAP, PRIMARY)); + assertEquals(globalPartSwapBackup, cache.sizeLong(part, SWAP, BACKUP)); + + assertEquals(globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, OFFHEAP)); + assertEquals(globalPartOffheapPrimary, cache.sizeLong(part, OFFHEAP, PRIMARY)); + assertEquals(globalPartOffheapBackup, cache.sizeLong(part, OFFHEAP, BACKUP)); + + assertEquals(globalPartTotalSwap + globalPartTotalOffheap, cache.sizeLong(part, PRIMARY, BACKUP, NEAR, SWAP, OFFHEAP)); + assertEquals(globalParitionSwapPrimary + globalPartOffheapPrimary, cache.sizeLong(part, SWAP, OFFHEAP, PRIMARY)); + assertEquals(globalPartSwapBackup + globalPartOffheapBackup, cache.sizeLong(part, SWAP, OFFHEAP, BACKUP)); + } + } + finally { + cache0.removeAll(new HashSet<>(primaryKeys)); + cache0.removeAll(new HashSet<>(backupKeys)); + } + + checkEmpty(); + } + + /** * @param exp Expected size. */ private void checkPrimarySize(int exp) { @@ -1167,4 +1628,4 @@ public abstract class IgniteCachePeekModesAbstractTest extends IgniteCacheAbstra assertTrue("Expected entries not found: " + allExp, allExp.isEmpty()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8af30781/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java index f2f69dd..36a56f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -207,6 +207,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ + @Override public long sizeLong(int partition, CachePeekMode... peekModes) throws CacheException { + return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, false)); + } + + /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { return compute.call(new SizeTask(cacheName, isAsync, peekModes, true)); } @@ -217,6 +222,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ + @Override public long localSizeLong(int partition, CachePeekMode... peekModes) { + return compute.call(new PartitionSizeLongTask(cacheName, isAsync, peekModes, partition, true)); + } + + /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) @@ -703,6 +713,40 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** * */ + private static class PartitionSizeLongTask extends CacheTaskAdapter<Void, Void, Long> { + + /** Partition. */ + int partition; + + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** Local. */ + private final boolean loc; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param peekModes Peek modes. + * @param partition partition. + * @param loc Local. + */ + public PartitionSizeLongTask(String cacheName, boolean async, CachePeekMode[] peekModes, int partition, boolean loc) { + super(cacheName, async, null); + this.loc = loc; + this.peekModes = peekModes; + this.partition = partition; + } + + /** {@inheritDoc} */ + @Override public Long call() throws Exception { + return loc ? cache().localSizeLong(partition, peekModes) : cache().sizeLong(partition, peekModes); + } + } + + /** + * + */ private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> { /** Key. */ private final K key; @@ -1499,4 +1543,4 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { return async ? cache.withAsync() : cache; } } -} \ No newline at end of file +}
