IGNITE-6846 Add metrics for entry processor invocations. - Fixes #3148. Signed-off-by: Dmitriy Pavlov <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e8669af Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e8669af Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e8669af Branch: refs/heads/master Commit: 5e8669af709e8a4bfd41997212ee471efe4e6f36 Parents: bcda7a1 Author: voipp <[email protected]> Authored: Tue Jul 31 16:01:39 2018 +0300 Committer: Dmitriy Pavlov <[email protected]> Committed: Tue Jul 31 16:01:39 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/CacheMetrics.java | 77 ++++ .../cache/CacheClusterMetricsMXBeanImpl.java | 55 +++ .../cache/CacheLocalMetricsMXBeanImpl.java | 55 +++ .../processors/cache/CacheMetricsImpl.java | 206 +++++++++ .../processors/cache/CacheMetricsSnapshot.java | 157 +++++++ .../processors/cache/GridCacheAdapter.java | 67 ++- .../processors/cache/GridCacheEntryEx.java | 8 +- .../processors/cache/GridCacheMapEntry.java | 115 +++-- .../cache/GridCacheUpdateAtomicResult.java | 15 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 5 + .../GridDhtAtomicAbstractUpdateFuture.java | 8 +- .../GridDhtAtomicAbstractUpdateRequest.java | 16 +- .../dht/atomic/GridDhtAtomicCache.java | 63 ++- .../GridDhtAtomicSingleUpdateRequest.java | 11 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 5 +- .../distributed/near/GridNearAtomicCache.java | 14 +- .../local/atomic/GridLocalAtomicCache.java | 116 ++++-- .../cache/transactions/IgniteTxAdapter.java | 2 + .../platform/cache/PlatformCache.java | 11 + .../cache/GridCacheAbstractMetricsSelfTest.java | 417 +++++++++++++++++++ .../processors/cache/GridCacheTestEntryEx.java | 8 +- .../GridCacheNearAtomicMetricsSelfTest.java | 35 ++ .../near/GridCacheNearMetricsSelfTest.java | 388 +++++++++++++++++ .../platform/PlatformCacheWriteMetricsTask.java | 55 +++ .../IgniteCacheMetricsSelfTestSuite.java | 2 + .../Cache/CacheMetricsTest.cs | 11 + .../Apache.Ignite.Core/Cache/ICacheMetrics.cs | 88 ++++ .../Impl/Cache/CacheMetricsImpl.cs | 79 +++- 28 files changed, 1999 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index e3e6446..d3a4c04 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -72,6 +72,83 @@ public interface CacheMetrics { public long getCachePuts(); /** + * The total number of cache invocations, caused update. + * + * @return The number of invocation updates. + */ + public long getEntryProcessorPuts(); + + /** + * The total number of cache invocations, caused removal. + * + * @return The number of invocation removals. + */ + public long getEntryProcessorRemovals(); + + /** + * The total number of cache invocations, caused no updates. + * + * @return The number of read-only invocations. + */ + public long getEntryProcessorReadOnlyInvocations(); + + /** + * The total number of cache invocations. + * + * @return The number of cache invocations. + */ + public long getEntryProcessorInvocations(); + + /** + * The total number of invocations on keys, which exist in cache. + * + * @return The number of cache invocation hits. + */ + public long getEntryProcessorHits(); + + /** + * The percentage of invocations on keys, which exist in cache. + * + * @return The percentage of successful invocation hits. + */ + public float getEntryProcessorHitPercentage(); + + /** + * The total number of invocations on keys, which don't exist in cache. + * + * @return The number of cache invocation misses. + */ + public long getEntryProcessorMisses(); + + /** + * The percentage of invocations on keys, which don't exist in cache. + * + * @return The percentage of invocation misses. + */ + public float getEntryProcessorMissPercentage(); + + /** + * The mean time to execute cache invokes. + * + * @return The time in µs. + */ + public float getEntryProcessorAverageInvocationTime(); + + /** + * So far, the minimum time to execute cache invokes. + * + * @return The time in µs. + */ + public float getEntryProcessorMinInvocationTime(); + + /** + * So far, the maximum time to execute cache invokes. + * + * @return The time in µs. + */ + public float getEntryProcessorMaxInvocationTime(); + + /** * The total number of removals from the cache. This does not include evictions, * where the cache itself initiates the removal to make space. * http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index 8935a98..cbd0b57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -290,6 +290,61 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getEntryProcessorPuts() { + return cache.clusterMetrics().getEntryProcessorPuts(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorRemovals() { + return cache.clusterMetrics().getEntryProcessorRemovals(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorReadOnlyInvocations() { + return cache.clusterMetrics().getEntryProcessorReadOnlyInvocations(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorInvocations() { + return cache.clusterMetrics().getEntryProcessorInvocations(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorHits() { + return cache.clusterMetrics().getEntryProcessorHits(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorHitPercentage() { + return cache.clusterMetrics().getEntryProcessorHitPercentage(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMissPercentage() { + return cache.clusterMetrics().getEntryProcessorMissPercentage(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorMisses() { + return cache.clusterMetrics().getEntryProcessorMisses(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorAverageInvocationTime() { + return cache.clusterMetrics().getEntryProcessorAverageInvocationTime(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMinInvocationTime() { + return cache.clusterMetrics().getEntryProcessorMinInvocationTime(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMaxInvocationTime() { + return cache.clusterMetrics().getEntryProcessorMaxInvocationTime(); + } + + /** {@inheritDoc} */ @Override public long getCacheRemovals() { return cache.clusterMetrics().getCacheRemovals(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index 212c7a0..790fe00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -286,6 +286,61 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getEntryProcessorPuts() { + return cache.metrics0().getEntryProcessorPuts(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorRemovals() { + return cache.metrics0().getEntryProcessorRemovals(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorReadOnlyInvocations() { + return cache.metrics0().getEntryProcessorReadOnlyInvocations(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorInvocations() { + return cache.metrics0().getEntryProcessorInvocations(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorHits() { + return cache.metrics0().getEntryProcessorHits(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorHitPercentage() { + return cache.metrics0().getEntryProcessorHitPercentage(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMissPercentage() { + return cache.metrics0().getEntryProcessorMissPercentage(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorMisses() { + return cache.metrics0().getEntryProcessorMisses(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorAverageInvocationTime() { + return cache.metrics0().getEntryProcessorAverageInvocationTime(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMinInvocationTime() { + return cache.metrics0().getEntryProcessorMinInvocationTime(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMaxInvocationTime() { + return cache.metrics0().getEntryProcessorMaxInvocationTime(); + } + + /** {@inheritDoc} */ @Override public long getCacheRemovals() { return cache.metrics0().getCacheRemovals(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 0f6d06f..e81e995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -51,6 +51,30 @@ public class CacheMetricsImpl implements CacheMetrics { /** Number of reads. */ private AtomicLong reads = new AtomicLong(); + /** Number of invocations caused update. */ + private AtomicLong entryProcessorPuts = new AtomicLong(); + + /** Number of invocations caused removal. */ + private AtomicLong entryProcessorRemovals = new AtomicLong(); + + /** Number of invocations caused update. */ + private AtomicLong entryProcessorReadOnlyInvocations = new AtomicLong(); + + /** Entry processor invoke time taken nanos. */ + private AtomicLong entryProcessorInvokeTimeNanos = new AtomicLong(); + + /** So far, the minimum time to execute cache invokes. */ + private AtomicLong entryProcessorMinInvocationTime = new AtomicLong(); + + /** So far, the maximum time to execute cache invokes. */ + private AtomicLong entryProcessorMaxInvocationTime = new AtomicLong(); + + /** Number of entry processor invokes on keys, which exist in cache. */ + private AtomicLong entryProcessorHits = new AtomicLong(); + + /** Number of entry processor invokes on keys, which don't exist in cache. */ + private AtomicLong entryProcessorMisses = new AtomicLong(); + /** Number of writes. */ private AtomicLong writes = new AtomicLong(); @@ -439,6 +463,15 @@ public class CacheMetricsImpl implements CacheMetrics { commitTimeNanos.set(0); rollbackTimeNanos.set(0); + entryProcessorPuts.set(0); + entryProcessorRemovals.set(0); + entryProcessorReadOnlyInvocations.set(0); + entryProcessorMisses.set(0); + entryProcessorHits.set(0); + entryProcessorInvokeTimeNanos.set(0); + entryProcessorMaxInvocationTime.set(0); + entryProcessorMinInvocationTime.set(0); + offHeapGets.set(0); offHeapPuts.set(0); offHeapRemoves.set(0); @@ -495,6 +528,79 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ + @Override public long getEntryProcessorPuts() { + return entryProcessorPuts.get(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorRemovals() { + return entryProcessorRemovals.get(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorReadOnlyInvocations() { + return entryProcessorReadOnlyInvocations.get(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorInvocations() { + return entryProcessorReadOnlyInvocations.get() + entryProcessorPuts.get() + entryProcessorRemovals.get(); + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorHits() { + return entryProcessorHits.get(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorHitPercentage() { + long hits = entryProcessorHits.get(); + long totalInvocations = getEntryProcessorInvocations(); + + if (hits == 0) + return 0; + + return (float) hits / totalInvocations * 100.0f; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorMisses() { + return entryProcessorMisses.get(); + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMissPercentage() { + long misses = entryProcessorMisses.get(); + long totalInvocations = getEntryProcessorInvocations(); + + if (misses == 0) + return 0; + + return (float) misses / totalInvocations * 100.0f; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorAverageInvocationTime() { + long totalInvokes = getEntryProcessorInvocations(); + long timeNanos = entryProcessorInvokeTimeNanos.get(); + + if (timeNanos == 0 || totalInvokes == 0) + return 0; + + return (1f * timeNanos) / totalInvokes / NANOS_IN_MICROSECOND; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMinInvocationTime() { + return (1f * entryProcessorMinInvocationTime.get()) / NANOS_IN_MICROSECOND; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMaxInvocationTime() { + return (1f * entryProcessorMaxInvocationTime.get()) / NANOS_IN_MICROSECOND; + } + + /** {@inheritDoc} */ @Override public long getCacheRemovals() { return rmCnt.get(); } @@ -554,6 +660,106 @@ public class CacheMetricsImpl implements CacheMetrics { } /** + * Cache invocations caused update callback. + * + * @param isHit Hit or miss flag. + */ + public void onInvokeUpdate(boolean isHit) { + entryProcessorPuts.incrementAndGet(); + + if (isHit) + entryProcessorHits.incrementAndGet(); + else + entryProcessorMisses.incrementAndGet(); + + if (delegate != null) + delegate.onInvokeUpdate(isHit); + } + + /** + * Cache invocations caused removal callback. + * + * @param isHit Hit or miss flag. + */ + public void onInvokeRemove(boolean isHit) { + entryProcessorRemovals.incrementAndGet(); + + if (isHit) + entryProcessorHits.incrementAndGet(); + else + entryProcessorMisses.incrementAndGet(); + + if (delegate != null) + delegate.onInvokeRemove(isHit); + } + + /** + * Read-only cache invocations. + * + * @param isHit Hit or miss flag. + */ + public void onReadOnlyInvoke(boolean isHit) { + entryProcessorReadOnlyInvocations.incrementAndGet(); + + if (isHit) + entryProcessorHits.incrementAndGet(); + else + entryProcessorMisses.incrementAndGet(); + + if (delegate != null) + delegate.onReadOnlyInvoke(isHit); + } + + /** + * Increments invoke operation time nanos. + * + * @param duration Duration. + */ + public void addInvokeTimeNanos(long duration) { + entryProcessorInvokeTimeNanos.addAndGet(duration); + + recalculateInvokeMinTimeNanos(duration); + + recalculateInvokeMaxTimeNanos(duration); + + if (delegate != null) + delegate.addInvokeTimeNanos(duration); + + } + + /** + * Recalculates invoke operation minimum time nanos. + * + * @param duration Duration. + */ + private void recalculateInvokeMinTimeNanos(long duration){ + long minTime = entryProcessorMinInvocationTime.longValue(); + + while (minTime > duration || minTime == 0) { + if (entryProcessorMinInvocationTime.compareAndSet(minTime, duration)) + break; + else + minTime = entryProcessorMinInvocationTime.longValue(); + } + } + + /** + * Recalculates invoke operation maximum time nanos. + * + * @param duration Duration. + */ + private void recalculateInvokeMaxTimeNanos(long duration){ + long maxTime = entryProcessorMaxInvocationTime.longValue(); + + while (maxTime < duration) { + if (entryProcessorMaxInvocationTime.compareAndSet(maxTime, duration)) + break; + else + maxTime = entryProcessorMaxInvocationTime.longValue(); + } + } + + /** * Cache write callback. */ public void onWrite() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 8a0f0e4..5f3001c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -38,6 +38,57 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Number of puts. */ private long puts = 0; + /** Number of invokes caused updates. */ + private long entryProcessorPuts = 0; + + /** Number of invokes caused no updates. */ + private long entryProcessorReadOnlyInvocations = 0; + + /** + * The mean time to execute cache invokes + */ + private float entryProcessorAverageInvocationTime = 0; + + /** + * The total number of cache invocations. + */ + private long entryProcessorInvocations = 0; + + /** + * The total number of cache invocations, caused removal. + */ + private long entryProcessorRemovals = 0; + + /** + * The total number of invocations on keys, which don't exist in cache. + */ + private long entryProcessorMisses = 0; + + /** + * The total number of invocations on keys, which exist in cache. + */ + private long entryProcessorHits = 0; + + /** + * The percentage of invocations on keys, which don't exist in cache. + */ + private float entryProcessorMissPercentage = 0; + + /** + * The percentage of invocations on keys, which exist in cache. + */ + private float entryProcessorHitPercentage = 0; + + /** + * So far, the maximum time to execute cache invokes. + */ + private float entryProcessorMaxInvocationTime = 0; + + /** + * So far, the minimum time to execute cache invokes. + */ + private float entryProcessorMinInvocationTime = 0; + /** Number of hits. */ private long hits = 0; @@ -270,6 +321,18 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { evicts = m.getCacheEvictions(); removes = m.getCacheRemovals(); + entryProcessorPuts = m.getEntryProcessorPuts(); + entryProcessorReadOnlyInvocations = m.getEntryProcessorReadOnlyInvocations(); + entryProcessorInvocations = m.getEntryProcessorInvocations(); + entryProcessorRemovals = m.getEntryProcessorRemovals(); + entryProcessorMisses = m.getEntryProcessorMisses(); + entryProcessorHits = m.getEntryProcessorHits(); + entryProcessorMissPercentage = m.getEntryProcessorMissPercentage(); + entryProcessorHitPercentage = m.getEntryProcessorHitPercentage(); + entryProcessorAverageInvocationTime = m.getEntryProcessorAverageInvocationTime(); + entryProcessorMaxInvocationTime = m.getEntryProcessorMaxInvocationTime(); + entryProcessorMinInvocationTime = m.getEntryProcessorMinInvocationTime(); + putAvgTimeNanos = m.getAveragePutTime(); getAvgTimeNanos = m.getAverageGetTime(); rmvAvgTimeNanos = m.getAverageRemoveTime(); @@ -386,6 +449,18 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { evicts += e.getCacheEvictions(); removes += e.getCacheRemovals(); + entryProcessorPuts = e.getEntryProcessorPuts(); + entryProcessorReadOnlyInvocations = e.getEntryProcessorReadOnlyInvocations(); + entryProcessorInvocations = e.getEntryProcessorInvocations(); + entryProcessorRemovals = e.getEntryProcessorRemovals(); + entryProcessorMisses = e.getEntryProcessorMisses(); + entryProcessorHits = e.getEntryProcessorHits(); + entryProcessorMissPercentage = e.getEntryProcessorMissPercentage(); + entryProcessorHitPercentage = e.getEntryProcessorHitPercentage(); + entryProcessorAverageInvocationTime = e.getEntryProcessorAverageInvocationTime(); + entryProcessorMaxInvocationTime = e.getEntryProcessorMaxInvocationTime(); + entryProcessorMinInvocationTime = e.getEntryProcessorMinInvocationTime(); + putAvgTimeNanos += e.getAveragePutTime(); getAvgTimeNanos += e.getAverageGetTime(); rmvAvgTimeNanos += e.getAverageRemoveTime(); @@ -524,6 +599,61 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public long getEntryProcessorPuts() { + return entryProcessorPuts; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorReadOnlyInvocations() { + return entryProcessorReadOnlyInvocations; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorInvocations() { + return entryProcessorInvocations; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorHits() { + return entryProcessorHits; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorHitPercentage() { + return entryProcessorHitPercentage; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMissPercentage() { + return entryProcessorMissPercentage; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorMisses() { + return entryProcessorMisses; + } + + /** {@inheritDoc} */ + @Override public long getEntryProcessorRemovals() { + return entryProcessorRemovals; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorAverageInvocationTime() { + return entryProcessorAverageInvocationTime; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMinInvocationTime() { + return entryProcessorMinInvocationTime; + } + + /** {@inheritDoc} */ + @Override public float getEntryProcessorMaxInvocationTime() { + return entryProcessorMaxInvocationTime; + } + + /** {@inheritDoc} */ @Override public long getCacheRemovals() { return removes; } @@ -950,6 +1080,18 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { out.writeLong(rebalanceStartTime); out.writeLong(rebalanceFinishTime); out.writeLong(rebalanceClearingPartitionsLeft); + + out.writeLong(entryProcessorPuts); + out.writeFloat(entryProcessorAverageInvocationTime); + out.writeLong(entryProcessorInvocations); + out.writeFloat(entryProcessorMaxInvocationTime); + out.writeFloat(entryProcessorMinInvocationTime); + out.writeLong(entryProcessorReadOnlyInvocations); + out.writeFloat(entryProcessorHitPercentage); + out.writeLong(entryProcessorHits); + out.writeLong(entryProcessorMisses); + out.writeFloat(entryProcessorMissPercentage); + out.writeLong(entryProcessorRemovals); } /** {@inheritDoc} */ @@ -1011,5 +1153,20 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { rebalanceStartTime = in.readLong(); rebalanceFinishTime = in.readLong(); rebalanceClearingPartitionsLeft = in.readLong(); + + // 11 long and 5 float values give 108 bytes in total. + if (in.available() >= 108) { + entryProcessorPuts = in.readLong(); + entryProcessorAverageInvocationTime = in.readFloat(); + entryProcessorInvocations = in.readLong(); + entryProcessorMaxInvocationTime = in.readFloat(); + entryProcessorMinInvocationTime = in.readFloat(); + entryProcessorReadOnlyInvocations = in.readLong(); + entryProcessorHitPercentage = in.readFloat(); + entryProcessorHits = in.readLong(); + entryProcessorMisses = in.readLong(); + entryProcessorMissPercentage = in.readFloat(); + entryProcessorRemovals = in.readLong(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c59e84e..eeed4fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2542,6 +2542,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (topVer != null) tx.topologyVersion(topVer); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, key, @@ -2550,6 +2554,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + EntryProcessorResult<T> res = null; if (resMap != null) { @@ -2572,6 +2579,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) throws IgniteCheckedException { @@ -2586,6 +2597,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<K, EntryProcessorResult<T>> res = fut.get().value(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + return res != null ? res : Collections.<K, EntryProcessorResult<T>>emptyMap(); } }); @@ -2602,6 +2616,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() { @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = @@ -2624,6 +2642,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { GridCacheReturn ret = fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + Map<K, EntryProcessorResult<T>> resMap = ret.value(); if (resMap != null) { @@ -2647,6 +2668,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { @@ -2674,6 +2699,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { GridCacheReturn ret = fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + assert ret != null; return ret.value() != null ? ret.<Map<K, EntryProcessorResult<T>>>value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); @@ -2690,6 +2718,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(map.keySet()); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) { @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { @@ -2712,6 +2744,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throws IgniteCheckedException { GridCacheReturn ret = fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + assert ret != null; return ret.value() != null ? ret.<Map<K, EntryProcessorResult<T>>>value() : Collections.<K, EntryProcessorResult<T>>emptyMap(); @@ -2728,13 +2763,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(map.keySet()); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) { @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); - return fut.get().value(); + Map<K, EntryProcessorResult<T>> value = fut.get().value(); + + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + + return value; } }); } @@ -6192,6 +6236,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * + */ + protected static class InvokeAllTimeStatClosure<T> extends UpdateTimeStatClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param metrics Metrics. + * @param start Start time. + */ + public InvokeAllTimeStatClosure(CacheMetricsImpl metrics, final long start) { + super(metrics, start); + } + + /** {@inheritDoc} */ + @Override protected void updateTimeStat() { + metrics.addInvokeTimeNanos(System.nanoTime() - start); + } + } + + /** * Delayed callable class. */ public static abstract class TopologyVersionAwareJob extends ComputeJobAdapter { http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index fc374bb..aef38e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -450,6 +450,7 @@ public interface GridCacheEntryEx { * @param taskName Task name. * @param updateCntr Update counter. * @param fut Dht atomic future. + * @param transformOp {@code True} if transform operation caused update. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -486,7 +487,8 @@ public interface GridCacheEntryEx { String taskName, @Nullable CacheObject prevVal, @Nullable Long updateCntr, - @Nullable GridDhtAtomicAbstractUpdateFuture fut + @Nullable GridDhtAtomicAbstractUpdateFuture fut, + boolean transformOp ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -506,6 +508,7 @@ public interface GridCacheEntryEx { * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. * @param taskName Task name. + * @param transformOp {@code True} if transform operation caused update. * @return Tuple containing success flag, old value and result for invoke operation. * @throws IgniteCheckedException If update failed. * @throws GridCacheEntryRemovedException If entry is obsolete. @@ -525,7 +528,8 @@ public interface GridCacheEntryEx { @Nullable CacheEntryPredicate[] filter, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + boolean transformOp ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 03f6d11..9f9b5c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -89,8 +90,11 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; +import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP; +import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** @@ -1045,9 +1049,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme recordNodeId(affNodeId, topVer); - if (metrics && cctx.statisticsEnabled()) + if (metrics && cctx.statisticsEnabled()) { cctx.cache().metrics0().onWrite(); + T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue(); + + if (entryProcRes != null && UPDATE.equals(entryProcRes.get1())) + cctx.cache().metrics0().onInvokeUpdate(old != null); + } + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { CacheObject evtOld = cctx.unwrapTemporary(old); @@ -1230,9 +1240,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme drReplicate(drType, null, newVer, topVer); - if (metrics && cctx.statisticsEnabled()) + if (metrics && cctx.statisticsEnabled()) { cctx.cache().metrics0().onRemove(); + T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue(); + + if (entryProcRes != null && DELETE.equals(entryProcRes.get1())) + cctx.cache().metrics0().onInvokeRemove(old != null); + } + if (tx == null) obsoleteVer = newVer; else { @@ -1358,7 +1374,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable CacheEntryPredicate[] filter, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + boolean transformOp ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.isLocal() && cctx.atomic(); @@ -1479,6 +1496,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme try { Object computed = entryProcessor.process(entry, invokeArgs); + transformOp = true; + if (entry.modified()) { updated0 = cctx.unwrapTemporary(entry.getValue()); @@ -1503,6 +1522,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (expiryPlc != null && !readFromStore && hasValueUnlocked()) updateTtl(expiryPlc); + updateMetrics(READ, metrics, transformOp, old != null); + return new GridTuple3<>(false, null, invokeRes); } } @@ -1564,9 +1585,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (ttl == CU.TTL_ZERO) + if (ttl == CU.TTL_ZERO) { op = GridCacheOperation.DELETE; + //If time expired no transformation needed. + transformOp = false; + } + // Try write-through. if (op == GridCacheOperation.UPDATE) { // Detach value before index update. @@ -1634,7 +1659,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (res) - updateMetrics(op, metrics); + updateMetrics(op, metrics, transformOp, old != null); + else if (op == DELETE && transformOp) + cctx.cache().metrics0().onInvokeRemove(old != null); if (lsnrCol != null) { long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null); @@ -1705,7 +1732,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme final String taskName, @Nullable final CacheObject prevVal, @Nullable final Long updateCntr, - @Nullable final GridDhtAtomicAbstractUpdateFuture fut + @Nullable final GridDhtAtomicAbstractUpdateFuture fut, + boolean transformOp ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic() && !detached(); @@ -1769,13 +1797,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject updateVal = null; GridCacheVersion updateVer = c.newVer; + boolean updateMetrics = metrics && cctx.statisticsEnabled(); + // Apply metrics. - if (metrics && + if (updateMetrics && updateRes.outcome().updateReadMetrics() && - cctx.statisticsEnabled() && needVal) cctx.cache().metrics0().onRead(oldVal != null); + if (updateMetrics && INVOKE_NO_OP.equals(updateRes.outcome()) && (transformOp || updateRes.transformed())) + cctx.cache().metrics0().onReadOnlyInvoke(oldVal != null); + else if (updateMetrics && REMOVE_NO_VAL.equals(updateRes.outcome()) + && (transformOp || updateRes.transformed())) + cctx.cache().metrics0().onInvokeRemove(oldVal != null); + switch (updateRes.outcome()) { case VERSION_CHECK_FAILED: { if (!cctx.isNear()) { @@ -1913,7 +1948,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (updateRes.success()) - updateMetrics(c.op, metrics); + updateMetrics(c.op, metrics, transformOp || updateRes.transformed(), oldVal != null); // Continuous query filter should be perform under lock. if (lsnrs != null) { @@ -4225,13 +4260,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * * @param op Operation. * @param metrics Update merics flag. + * @param transformed {@code True} if transform operation caused update. + * @param hasOldVal {@code True} if entry has old value. */ - private void updateMetrics(GridCacheOperation op, boolean metrics) { + private void updateMetrics(GridCacheOperation op, boolean metrics, boolean transformed, boolean hasOldVal) { if (metrics && cctx.statisticsEnabled()) { - if (op == GridCacheOperation.DELETE) + if (op == GridCacheOperation.DELETE) { cctx.cache().metrics0().onRemove(); - else + + if (transformed) + cctx.cache().metrics0().onInvokeRemove(hasOldVal); + } else if (op == READ && transformed) + cctx.cache().metrics0().onReadOnlyInvoke(hasOldVal); + else { cctx.cache().metrics0().onWrite(); + + if (transformed) + cctx.cache().metrics0().onInvokeUpdate(hasOldVal); + } } } @@ -4699,12 +4745,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean invoke = op == TRANSFORM; + boolean transformed = false; + if (invoke) { invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry); invokeRes = runEntryProcessor(invokeEntry); op = writeObj == null ? DELETE : UPDATE; + + transformed = true; } CacheObject newVal = (CacheObject)writeObj; @@ -4747,7 +4797,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); return; } @@ -4765,7 +4816,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + true); return; } @@ -4784,7 +4836,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); return; } @@ -4805,12 +4858,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (op == UPDATE) { assert writeObj != null; - update(conflictCtx, invokeRes, storeLoadedVal != null); + update(conflictCtx, invokeRes, storeLoadedVal != null, transformed); } else { assert op == DELETE && writeObj == null : op; - remove(conflictCtx, invokeRes, storeLoadedVal != null); + remove(conflictCtx, invokeRes, storeLoadedVal != null, transformed); } assert updateRes != null && treeOp != null; @@ -4934,11 +4987,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param conflictCtx Conflict context. * @param invokeRes Entry processor result (for invoke operation). * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. + * @param transformed {@code True} if update caused by transformation operation. * @throws IgniteCheckedException If failed. */ private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, @Nullable IgniteBiTuple<Object, Exception> invokeRes, - boolean readFromStore) + boolean readFromStore, + boolean transformed) throws IgniteCheckedException { GridCacheContext cctx = entry.context(); @@ -4979,7 +5034,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme writeObj = null; - remove(conflictCtx, invokeRes, readFromStore); + remove(conflictCtx, invokeRes, readFromStore, false); return; } @@ -5018,7 +5073,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); return; } @@ -5081,19 +5137,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme newSysExpireTime, null, conflictCtx, - updateCntr0); + updateCntr0, + transformed); } /** * @param conflictCtx Conflict context. * @param invokeRes Entry processor result (for invoke operation). * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store. + * @param transformed {@code True} if remove caused by tranformation operation. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx, @Nullable IgniteBiTuple<Object, Exception> invokeRes, - boolean readFromStore) + boolean readFromStore, + boolean transformed) throws IgniteCheckedException { GridCacheContext cctx = entry.context(); @@ -5123,7 +5182,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); return; } @@ -5178,7 +5238,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_CALCULATE, enqueueVer, conflictCtx, - updateCntr0); + updateCntr0, + transformed); } /** @@ -5259,7 +5320,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); } // Will update something. else { @@ -5325,7 +5387,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - 0); + 0, + false); } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 97cb534..0bf1c57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult { /** */ private final long updateCntr; + /** Flag indicating whether value is transformed. */ + private boolean transformed; + /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; @@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult { * @param rmvVer Version for deferred delete. * @param conflictRes DR resolution result. * @param updateCntr Partition update counter. + * @param transformed {@code True} if result was transformed. */ GridCacheUpdateAtomicResult(UpdateOutcome outcome, @Nullable CacheObject oldVal, @@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult { long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, - long updateCntr) { + long updateCntr, + boolean transformed) { assert outcome != null; this.outcome = outcome; @@ -93,6 +98,7 @@ public class GridCacheUpdateAtomicResult { this.rmvVer = rmvVer; this.conflictRes = conflictRes; this.updateCntr = updateCntr; + this.transformed = transformed; } /** @@ -240,6 +246,13 @@ public class GridCacheUpdateAtomicResult { } } + /** + * @return {@code True} if transformed. + */ + public boolean transformed() { + return transformed; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheUpdateAtomicResult.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index d02b851..ba1210e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -440,6 +440,11 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; if (op == NOOP) { + GridCacheAdapter<?, ?> cache = writeEntry.context().cache(); + + if (cache.context().statisticsEnabled()) + cache.metrics0().onReadOnlyInvoke(oldVal != null); + if (expiry != null) { long ttl = CU.toTtl(expiry.getExpiryForAccess()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index e8824e7..c04e1af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -161,6 +162,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA * @param addPrevVal If {@code true} sends previous value to backups. * @param prevVal Previous value. * @param updateCntr Partition update counter. + * @param cacheOp Corresponding cache operation. */ @SuppressWarnings("ForLoopReplaceableByForEach") final void addWriteEntry( @@ -173,7 +175,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr) { + long updateCntr, + GridCacheOperation cacheOp) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); List<ClusterNode> affNodes = affAssignment.get(entry.partition()); @@ -222,7 +225,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureA conflictVer, addPrevVal, prevVal, - updateCntr); + updateCntr, + cacheOp); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index a50b68c..a5e9feb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -59,6 +60,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess /** */ protected static final int DHT_ATOMIC_OBSOLETE_NEAR_KEY_FLAG_MASK = 0x20; + /** Flag indicating transformation operation was performed. */ + protected static final int DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK = 0x40; + /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); @@ -225,6 +229,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess } /** + * @return {@code True} if transformation operation was performed. + */ + public final boolean transformOperation() { + return isFlag(DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK); + } + + /** * @return {@code True} if on response flag changed. */ public boolean onResponse() { @@ -268,6 +279,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess * @param addPrevVal If {@code true} adds previous value. * @param prevVal Previous value. * @param updateCntr Update counter. + * @param cacheOp Corresponding cache operation. */ public abstract void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -277,8 +289,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMess @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr - ); + long updateCntr, + GridCacheOperation cacheOp); /** * @param key Key to add. http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8408b32..c39842e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -820,6 +820,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKey(key); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + CacheOperationContext opCtx = ctx.operationContextPerCall(); final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); @@ -838,6 +842,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { throws IgniteCheckedException { Map<K, EntryProcessorResult<T>> resMap = fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + if (resMap != null) { assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); @@ -885,6 +892,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(keys); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { @Override public EntryProcessor apply(K k) { return entryProcessor; @@ -912,6 +923,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) throws IgniteCheckedException { Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + return ctx.unwrapInvokeResult(resMap, keepBinary); } }); @@ -926,7 +940,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(map.keySet()); - return (Map<K, EntryProcessorResult<T>>)updateAll0(null, + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + Map<K, EntryProcessorResult<T>> updateResults = (Map<K, EntryProcessorResult<T>>) updateAll0(null, map, args, null, @@ -935,6 +953,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, TRANSFORM, false).get(); + + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + + return updateResults; } /** {@inheritDoc} */ @@ -947,7 +970,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (keyCheck) validateCacheKeys(map.keySet()); - return updateAll0(null, + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture updateResults = updateAll0(null, map, args, null, @@ -956,6 +983,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, TRANSFORM, true); + + if (statsEnabled) + updateResults.listen(new InvokeAllTimeStatClosure(metrics0(), start)); + + return updateResults; } /** @@ -2077,9 +2109,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { curInvokeRes = CacheInvokeResult.fromResult(computed); } - if (!invokeEntry.modified()) + if (!invokeEntry.modified()) { + if (ctx.statisticsEnabled()) + ctx.cache().metrics0().onReadOnlyInvoke(old != null); + continue; - else { + } else { updatedVal = ctx.unwrapTemporary(invokeEntry.getValue()); updated = ctx.toCacheObject(updatedVal); @@ -2461,7 +2496,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, /*prevVal*/null, /*updateCntr*/null, - dhtFut); + dhtFut, + false); if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. @@ -2484,7 +2520,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { newConflictVer, sndPrevVal, updRes.oldValue(), - updRes.updateCounter()); + updRes.updateCounter(), + op); if (readers != null) dhtFut.addNearWriteEntries( @@ -2687,6 +2724,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Get readers before innerUpdate (reader cleared after remove). GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); + EntryProcessor<Object, Object, Object> entryProcessor = + entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nearNode.id(), @@ -2715,7 +2755,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, null, null, - dhtFut); + dhtFut, + entryProcessor != null); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2740,8 +2781,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { batchRes.addDeleted(entry, updRes, entries); if (dhtFut != null) { - EntryProcessor<Object, Object, Object> entryProcessor = - entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); dhtFut.addWriteEntry( affAssignment, @@ -2753,7 +2792,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, sndPrevVal, updRes.oldValue(), - updRes.updateCounter()); + updRes.updateCounter(), + op); if (readers != null) dhtFut.addNearWriteEntries( @@ -3201,7 +3241,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, prevVal, updateIdx, - null); + null, + req.transformOperation()); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index 0ade243..19b24b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -26,6 +26,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -38,6 +39,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + /** * */ @@ -118,6 +121,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat * @param addPrevVal If {@code true} adds previous value. * @param prevVal Previous value. * @param updateCntr Update counter. + * @param cacheOp Corresponding cache operation. */ @Override public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -127,8 +131,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr - ) { + long updateCntr, + GridCacheOperation cacheOp) { assert entryProcessor == null; assert ttl <= 0 : ttl; assert conflictExpireTime <= 0 : conflictExpireTime; @@ -144,6 +148,9 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat this.prevVal = prevVal; this.updateCntr = updateCntr; + + if (cacheOp == TRANSFORM) + setFlag(true, DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 6f3f530..31439d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -201,8 +202,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque @Nullable GridCacheVersion conflictVer, boolean addPrevVal, @Nullable CacheObject prevVal, - long updateCntr - ) { + long updateCntr, + GridCacheOperation cacheOp) { assert key.partition() >= 0 : key; keys.add(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 2f832b4..23c2480 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -197,7 +197,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { req.keepBinary(), req.nodeId(), req.subjectId(), - taskName); + taskName, + req.operation() == TRANSFORM); } catch (IgniteCheckedException e) { res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); @@ -214,6 +215,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { * @param nodeId Node ID. * @param subjId Subject ID. * @param taskName Task name. + * @param transformedValue {@code True} if transformed value. * @throws IgniteCheckedException If failed. */ private void processNearAtomicUpdateResponse( @@ -225,8 +227,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { boolean keepBinary, UUID nodeId, UUID subjId, - String taskName - ) throws IgniteCheckedException { + String taskName, + boolean transformedValue) throws IgniteCheckedException { try { while (true) { GridCacheEntryEx entry = null; @@ -266,7 +268,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { taskName, null, null, - null); + null, + transformedValue); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -365,7 +368,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { taskName, null, null, - null); + null, + false); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a9d7fa1..b96dbdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -560,6 +560,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (keyCheck) validateCacheKeys(keys); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { @Override public EntryProcessor apply(K k) { return entryProcessor; @@ -570,17 +574,23 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM, - invokeMap.keySet(), - invokeMap.values(), - args, - expiryPerCall(), - false, - false, - null, - ctx.writeThrough(), - ctx.readThrough(), - keepBinary); + Map<K, EntryProcessorResult<T>> entryProcessorRes = (Map<K, EntryProcessorResult<T>>) updateAllInternal( + TRANSFORM, + invokeMap.keySet(), + invokeMap.values(), + args, + expiryPerCall(), + false, + false, + null, + ctx.writeThrough(), + ctx.readThrough(), + keepBinary); + + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + + return entryProcessorRes; } /** {@inheritDoc} */ @@ -593,6 +603,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (keyCheck) validateCacheKey(key); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + Map<? extends K, EntryProcessor> invokeMap = Collections.singletonMap(key, (EntryProcessor)entryProcessor); @@ -608,6 +622,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { throws IgniteCheckedException { Map<K, EntryProcessorResult<T>> resMap = fut.get(); + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + if (resMap != null) { assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); @@ -630,18 +647,27 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (keyCheck) validateCacheKeys(keys); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { @Override public EntryProcessor apply(K k) { return entryProcessor; } }); - return updateAllAsync0(null, + IgniteInternalFuture fut = updateAllAsync0(null, invokeMap, args, true, false, null); + + if (statsEnabled) + fut.listen(new InvokeAllTimeStatClosure(metrics0(), start)); + + return fut; } /** {@inheritDoc} */ @@ -654,19 +680,29 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (keyCheck) validateCacheKeys(map.keySet()); + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + CacheOperationContext opCtx = ctx.operationContextPerCall(); - return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM, - map.keySet(), - map.values(), - args, - expiryPerCall(), - false, - false, - null, - ctx.writeThrough(), - ctx.readThrough(), - opCtx != null && opCtx.isKeepBinary()); + Map<K, EntryProcessorResult<T>> entryProcessorResult = (Map<K, EntryProcessorResult<T>>) updateAllInternal( + TRANSFORM, + map.keySet(), + map.values(), + args, + expiryPerCall(), + false, + false, + null, + ctx.writeThrough(), + ctx.readThrough(), + opCtx != null && opCtx.isKeepBinary()); + + if (statsEnabled) + metrics0().addInvokeTimeNanos(System.nanoTime() - start); + + return entryProcessorResult; } /** {@inheritDoc} */ @@ -679,12 +715,21 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (keyCheck) validateCacheKeys(map.keySet()); - return updateAllAsync0(null, + final boolean statsEnabled = ctx.statisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture fut = updateAllAsync0(null, map, args, true, false, null); + + if (statsEnabled) + fut.listen(new InvokeAllTimeStatClosure(metrics0(), start)); + + return fut; } /** @@ -893,7 +938,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { filters, intercept, subjId, - taskName); + taskName, + false); if (op == TRANSFORM) { if (t.get3() != null) { @@ -1076,7 +1122,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { validation = true; ctx.validateKeyAndValue(entry.key(), updated); - } + } else if (ctx.statisticsEnabled() && !invokeEntry.modified()) + ctx.cache().metrics0().onReadOnlyInvoke(old != null); } catch (Exception e) { invokeRes = CacheInvokeResult.fromError(e); @@ -1115,7 +1162,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { keepBinary, err, subjId, - taskName); + taskName, + true); putMap = null; writeVals = null; @@ -1153,7 +1201,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { keepBinary, err, subjId, - taskName); + taskName, + true); rmvKeys = null; @@ -1258,7 +1307,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { keepBinary, err, subjId, - taskName); + taskName, + op == TRANSFORM); } else assert filtered.isEmpty(); @@ -1283,6 +1333,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { * @param err Optional partial update exception. * @param subjId Subject ID. * @param taskName Task name. + * @param transformed {@code True} if transform operation performed. * @return Partial update exception. */ @SuppressWarnings({"unchecked", "ConstantConditions", "ForLoopReplaceableByForEach"}) @@ -1296,8 +1347,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { boolean keepBinary, @Nullable CachePartialUpdateCheckedException err, UUID subjId, - String taskName - ) { + String taskName, + boolean transformed) { assert putMap == null ^ rmvKeys == null; GridCacheOperation op; @@ -1373,7 +1424,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { null, false, subjId, - taskName); + taskName, + transformed); if (intercept) { if (op == UPDATE) http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 3cf1146..15df637 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1578,6 +1578,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement GridCacheOperation op = modified ? (cacheVal == null ? DELETE : UPDATE) : NOOP; + txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? null : cacheVal)); + if (op == NOOP) { ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
