This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8e5210b IGNITE-11989: remove cache preload predicate (#6699)
8e5210b is described below
commit 8e5210be6e6c3fa6d3174fbf2140734d3313b1be
Author: Maxim Muzafarov <[email protected]>
AuthorDate: Mon Jul 22 08:55:12 2019 +0300
IGNITE-11989: remove cache preload predicate (#6699)
---
.../processors/cache/GridCachePreloader.java | 12 ----
.../cache/GridCachePreloaderAdapter.java | 14 -----
.../dht/preloader/GridDhtPartitionDemander.java | 69 ++++++++--------------
.../dht/preloader/GridDhtPartitionSupplier.java | 21 +------
.../dht/preloader/GridDhtPreloader.java | 12 ----
5 files changed, 27 insertions(+), 101 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 3eac9b0..6570c30 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -33,7 +33,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
/**
@@ -97,17 +96,6 @@ public interface GridCachePreloader {
@Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut);
/**
- * @param p Preload predicate.
- */
- public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> p);
-
- /**
- * @return Preload predicate. If not {@code null}, will evaluate each
preloaded entry during
- * send and receive, and if predicate evaluates to {@code false},
entry will be skipped.
- */
- public IgnitePredicate<GridCacheEntryInfo> preloadPredicate();
-
- /**
* @return Future which will complete when preloader is safe to use.
*/
public IgniteInternalFuture<Object> startFuture();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 3e52a23..1998cfa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -32,7 +32,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_REBALANCE_BATCHES_PREFETCH_COUNT;
@@ -56,9 +55,6 @@ public class GridCachePreloaderAdapter implements
GridCachePreloader {
/** Start future (always completed by default). */
private final IgniteInternalFuture finFut;
- /** Preload predicate. */
- protected IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
/**
* @param grp Cache group.
*/
@@ -100,16 +96,6 @@ public class GridCachePreloaderAdapter implements
GridCachePreloader {
}
/** {@inheritDoc} */
- @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo>
preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /** {@inheritDoc} */
- @Override public IgnitePredicate<GridCacheEntryInfo> preloadPredicate() {
- return preloadPred;
- }
-
- /** {@inheritDoc} */
@Override public IgniteInternalFuture<Object> startFuture() {
return finFut;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0cf5aba..975b02a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -73,7 +73,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import org.jetbrains.annotations.Nullable;
@@ -98,9 +97,6 @@ public class GridDhtPartitionDemander {
/** */
private final IgniteLogger log;
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
/** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
@GridToStringInclude
private final GridFutureAdapter syncFut = new GridFutureAdapter();
@@ -188,15 +184,6 @@ public class GridDhtPartitionDemander {
}
/**
- * Sets preload predicate for demand pool.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
* @return Rebalance future.
*/
IgniteInternalFuture<Boolean> forceRebalance() {
@@ -1040,38 +1027,34 @@ public class GridDhtPartitionDemander {
from.id() + ", grpId=" + grp.groupId() + ']');
}
- if (preloadPred == null || preloadPred.apply(entry)) {
- if (cached.initialValue(
- entry.value(),
- entry.version(),
- cctx.mvccEnabled() ?
((MvccVersionAware)entry).mvccVersion() : null,
- cctx.mvccEnabled() ?
((MvccUpdateVersionAware)entry).newMvccVersion() : null,
- cctx.mvccEnabled() ?
((MvccVersionAware)entry).mvccTxState() : TxState.NA,
- cctx.mvccEnabled() ?
((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA,
- entry.ttl(),
- entry.expireTime(),
- true,
- topVer,
- cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
- false
- )) {
- cached.touch(); // Start tracking.
-
- if
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) &&
!cached.isInternal())
- cctx.events().addEvent(cached.partition(),
cached.key(), cctx.localNodeId(), null,
- null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED,
entry.value(), true, null,
- false, null, null, null, true);
- }
- else {
- cached.touch(); // Start tracking.
+ if (cached.initialValue(
+ entry.value(),
+ entry.version(),
+ cctx.mvccEnabled() ?
((MvccVersionAware)entry).mvccVersion() : null,
+ cctx.mvccEnabled() ?
((MvccUpdateVersionAware)entry).newMvccVersion() : null,
+ cctx.mvccEnabled() ?
((MvccVersionAware)entry).mvccTxState() : TxState.NA,
+ cctx.mvccEnabled() ?
((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA,
+ entry.ttl(),
+ entry.expireTime(),
+ true,
+ topVer,
+ cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+ false
+ )) {
+ cached.touch(); // Start tracking.
- if (log.isTraceEnabled())
- log.trace("Rebalancing entry is already in cache
(will ignore) [key=" + cached.key() +
- ", part=" + p + ']');
- }
+ if
(cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) &&
!cached.isInternal())
+ cctx.events().addEvent(cached.partition(),
cached.key(), cctx.localNodeId(), null,
+ null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED,
entry.value(), true, null,
+ false, null, null, null, true);
+ }
+ else {
+ cached.touch(); // Start tracking.
+
+ if (log.isTraceEnabled())
+ log.trace("Rebalancing entry is already in cache (will
ignore) [key=" + cached.key() +
+ ", part=" + p + ']');
}
- else if (log.isTraceEnabled())
- log.trace("Rebalance predicate evaluated to false for
entry (will ignore): " + entry);
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isTraceEnabled())
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 520e8f6..43907aa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;
import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
@@ -71,9 +70,6 @@ class GridDhtPartitionSupplier {
/** */
private GridDhtPartitionTopology top;
- /** Preload predicate. */
- private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
/** Supply context map. T3: nodeId, topicId, topVer. */
private final Map<T3<UUID, Integer, AffinityTopologyVersion>,
SupplyContext> scMap = new HashMap<>();
@@ -166,15 +162,6 @@ class GridDhtPartitionSupplier {
}
/**
- * Sets preload predicate for this supplier.
- *
- * @param preloadPred Preload predicate.
- */
- void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
- this.preloadPred = preloadPred;
- }
-
- /**
* For each demand message method lookups (or creates new) supply context
and starts to iterate entries across requested partitions.
* Each entry in iterator is placed to prepared supply message.
*
@@ -389,13 +376,7 @@ class GridDhtPartitionSupplier {
if (info == null)
continue;
- if (preloadPred == null || preloadPred.apply(info))
- supplyMsg.addEntry0(part, iter.historical(part), info,
grp.shared(), grp.cacheObjectContext());
- else {
- if (log.isTraceEnabled())
- log.trace("Rebalance predicate evaluated to false
(will not send " +
- "cache entry): " + info);
- }
+ supplyMsg.addEntry0(part, iter.historical(part), info,
grp.shared(), grp.cacheObjectContext());
if (iter.isPartitionDone(part)) {
supplyMsg.last(part, loc.updateCounter());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index b57e062..94784f0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -34,7 +34,6 @@ import
org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -48,7 +47,6 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
@@ -116,16 +114,6 @@ public class GridDhtPreloader extends
GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo>
preloadPred) {
- super.preloadPredicate(preloadPred);
-
- assert supplier != null && demander != null : "preloadPredicate may be
called only after start()";
-
- supplier.preloadPredicate(preloadPred);
- demander.preloadPredicate(preloadPred);
- }
-
- /** {@inheritDoc} */
@Override public void onKernalStop() {
if (log.isDebugEnabled())
log.debug("DHT rebalancer onKernalStop callback.");