IGNITE-950 - Fixing RemoveAll()
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e0a1a660 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e0a1a660 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e0a1a660 Branch: refs/heads/ignite-1753-1282 Commit: e0a1a6602f7d7cfc6039a8868c69ca7690f1a2c1 Parents: 8585cce Author: Alexey Goncharuk <[email protected]> Authored: Tue Nov 3 17:08:09 2015 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Nov 3 17:08:09 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 40 ++++++++++++++------ .../datastreamer/DataStreamProcessor.java | 2 - .../datastreamer/DataStreamerImpl.java | 4 -- .../platform/cache/PlatformCache.java | 2 - .../datastreamer/PlatformDataStreamer.java | 2 - 5 files changed, 29 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 5aace6e..77035df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -169,6 +169,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter boolean skipStore = opCtx != null && opCtx.skipStore(); + boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + do { retry = false; @@ -181,7 +183,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); retry = !ctx.kernalContext().task().execute( - new RemoveAllTask(ctx.name(), topVer, skipStore), null).get(); + new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null).get(); } } while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry); @@ -200,9 +202,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter CacheOperationContext opCtx = ctx.operationContextPerCall(); - boolean skipStore = opCtx != null && opCtx.skipStore(); - - removeAllAsync(opFut, topVer, skipStore); + removeAllAsync(opFut, topVer, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.isKeepBinary()); return opFut; } @@ -212,15 +212,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param topVer Topology version. * @param skipStore Skip store flag. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer, - final boolean skipStore) { + private void removeAllAsync( + final GridFutureAdapter<Void> opFut, + final AffinityTopologyVersion topVer, + final boolean skipStore, + final boolean keepBinary + ) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute( - new RemoveAllTask(ctx.name(), topVer, skipStore), null); + new RemoveAllTask(ctx.name(), topVer, skipStore, keepBinary), null); rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> fut) { @@ -232,7 +236,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (topVer0.equals(topVer) && !retry) opFut.onDone(); else - removeAllAsync(opFut, topVer0, skipStore); + removeAllAsync(opFut, topVer0, skipStore, keepBinary); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -277,15 +281,19 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** Skip store flag. */ private final boolean skipStore; + /** Keep binary flag. */ + private final boolean keepBinary; + /** * @param cacheName Cache name. * @param topVer Affinity topology version. * @param skipStore Skip store flag. */ - public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) { + public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore, boolean keepBinary) { this.cacheName = cacheName; this.topVer = topVer; this.skipStore = skipStore; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @@ -294,7 +302,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) - jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node); + jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node); return jobs; } @@ -335,15 +343,24 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** Skip store flag. */ private final boolean skipStore; + /** Keep binary flag. */ + private final boolean keepBinary; + /** * @param cacheName Cache name. * @param topVer Topology version. * @param skipStore Skip store flag. */ - private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { + private GlobalRemoveAllJob( + String cacheName, + @NotNull AffinityTopologyVersion topVer, + boolean skipStore, + boolean keepBinary + ) { super(cacheName, topVer); this.skipStore = skipStore; + this.keepBinary = keepBinary; } /** {@inheritDoc} */ @@ -378,6 +395,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter ((DataStreamerImpl) dataLdr).maxRemapCount(0); dataLdr.skipStore(skipStore); + dataLdr.keepBinary(keepBinary); dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 14cb1c8..a2aab77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -283,8 +283,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { Collection<DataStreamerEntry> col = req.entries(); - U.debug(log, "Processing data streamer update request [keepBinary=" + req.keepBinary() + ", keys=" + col + ']'); - DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx, log, req.cacheName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index df1f656..27eff0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1255,8 +1255,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed IgniteInternalFuture<Object> fut; if (isLocNode) { - U.dumpStack("Submitting local streamer job [entries=" + entries + ", keepBinary=" + keepBinary + ']'); - fut = ctx.closure().callLocalSafe( new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false); @@ -1338,8 +1336,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (topVer == null) topVer = ctx.cache().context().exchange().readyAffinityVersion(); - U.debug(log, "Creating data streamer request [keepBinary=" + keepBinary + ']'); - DataStreamerRequest req = new DataStreamerRequest( reqId, topicBytes, http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index d3588ee..6ec52d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -720,8 +720,6 @@ public class PlatformCache extends PlatformAbstractTarget { * @throws org.apache.ignite.IgniteCheckedException In case of error. */ public void removeAll() throws IgniteCheckedException { - U.debug(log, "Will removeAll on platform cache: " + cache.operationContext()); - cache.removeAll(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e0a1a660/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index 9caa913..794ab0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -85,8 +85,6 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { this.cacheName = cacheName; this.ldr = ldr; this.keepPortable = keepPortable; - - U.debug(log, "Created platform streamer [keepBinary=" + ldr.keepBinary() + ']'); } /** {@inheritDoc} */
