This is an automated email from the ASF dual-hosted git repository.
sboikov pushed a commit to branch ignite-11704
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-11704 by this push:
new e6a793b ignite-11704
e6a793b is described below
commit e6a793bf4caf897453f40ad4df977ea7268dea4c
Author: sboikov <[email protected]>
AuthorDate: Fri Jul 19 21:20:51 2019 +0300
ignite-11704
---
.../processors/cache/CacheGroupContext.java | 3 +-
.../processors/cache/GridCacheMapEntry.java | 88 ++++++++++++++++++-
.../cache/IgniteCacheOffheapManager.java | 5 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 26 +++++-
.../dht/topology/GridDhtLocalPartition.java | 99 +++++++++++++++++++++-
.../cache/persistence/GridCacheOffheapManager.java | 8 +-
.../distributed/CacheRemoveWithTombstonesTest.java | 39 +++++++--
7 files changed, 250 insertions(+), 18 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 4af5de5..7963893 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
@@ -1307,7 +1308,7 @@ public class CacheGroupContext {
}
public boolean createTombstone(@Nullable GridDhtLocalPartition part) {
- return part != null && supportsTombstone();
+ return part != null && supportsTombstone() && part.state() ==
GridDhtPartitionState.MOVING;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index adc8699..08986a9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1717,8 +1717,12 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
}
}
- if (cctx.group().createTombstone(localPartition()))
- cctx.offheap().removeWithTombstone(cctx, key, newVer,
partition(), localPartition());
+ if (cctx.group().createTombstone(localPartition())) {
+ cctx.offheap().removeWithTombstone(cctx, key, newVer,
localPartition());
+
+ if (!cctx.group().createTombstone(localPartition()))
+ removeTombstone0(newVer);
+ }
else
removeValue();
@@ -2818,6 +2822,34 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
}
/**
+ * @param tombstoneVer Tombstone version.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void removeTombstone(GridCacheVersion tombstoneVer) throws
GridCacheEntryRemovedException, IgniteCheckedException {
+ lockEntry();
+
+ try {
+ checkObsolete();
+
+ removeTombstone0(tombstoneVer);
+ }
+ finally {
+ unlockEntry();
+ }
+ }
+
+ /**
+ * @param tombstoneVer Tombstone version.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void removeTombstone0(GridCacheVersion tombstoneVer) throws
IgniteCheckedException {
+ RemoveClosure closure = new RemoveClosure(this, tombstoneVer);
+
+ cctx.offheap().invoke(cctx, key, localPartition(), closure);
+ }
+
+ /**
* @return {@code True} if this entry should not be evicted from cache.
*/
protected boolean evictionDisabled() {
@@ -5720,6 +5752,58 @@ public abstract class GridCacheMapEntry extends
GridMetadataAwareAdapter impleme
/**
*
*/
+ private static class RemoveClosure implements
IgniteCacheOffheapManager.OffheapInvokeClosure {
+ /** */
+ private final GridCacheMapEntry entry;
+
+ /** */
+ private final GridCacheVersion ver;
+
+ /** */
+ private IgniteTree.OperationType op;
+
+ /** */
+ private CacheDataRow oldRow;
+
+ public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) {
+ this.entry = entry;
+ this.ver = ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable CacheDataRow oldRow() {
+ return oldRow;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void call(@Nullable CacheDataRow row) throws
IgniteCheckedException {
+ if (row == null || !ver.equals(row.version())) {
+ op = IgniteTree.OperationType.NOOP;
+
+ return;
+ }
+
+ row.key(entry.key);
+
+ oldRow = row;
+
+ op = IgniteTree.OperationType.REMOVE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow newRow() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTree.OperationType operationType() {
+ return op;
+ }
+ }
+
+ /**
+ *
+ */
private static class UpdateClosure implements
IgniteCacheOffheapManager.OffheapInvokeClosure {
/** */
private final GridCacheMapEntry entry;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index c11e909..c883343 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -406,7 +406,6 @@ public interface IgniteCacheOffheapManager {
GridCacheContext cctx,
KeyCacheObject key,
GridCacheVersion ver,
- int partId,
GridDhtLocalPartition part
) throws IgniteCheckedException;
@@ -454,6 +453,8 @@ public interface IgniteCacheOffheapManager {
*/
public GridIterator<CacheDataRow> partitionIterator(final int part,
boolean withTombstones) throws IgniteCheckedException;
+ public GridIterator<CacheDataRow> tombstonesIterator(final int part)
throws IgniteCheckedException;
+
/**
* @param part Partition number.
* @param topVer Topology version.
@@ -917,7 +918,7 @@ public interface IgniteCacheOffheapManager {
* @param partId Partition number.
* @throws IgniteCheckedException If failed.
*/
- public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject
key, GridCacheVersion ver, int partId) throws IgniteCheckedException;
+ public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject
key, GridCacheVersion ver, GridDhtLocalPartition part) throws
IgniteCheckedException;
/**
* @param cctx Cache context.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c45e3b1..3597de7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -632,9 +632,10 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
GridCacheContext cctx,
KeyCacheObject key,
GridCacheVersion ver,
- int partId,
GridDhtLocalPartition part) throws IgniteCheckedException {
- dataStore(part).removeWithTombstone(cctx, key, ver, partId);
+ assert part != null;
+
+ dataStore(part).removeWithTombstone(cctx, key, ver, part);
}
@Override public boolean isTombstone(CacheDataRow row) throws
IgniteCheckedException {
@@ -915,6 +916,19 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null,
null, withTombstones);
}
+ /** {@inheritDoc} */
+ @Override public GridIterator<CacheDataRow> tombstonesIterator(int part) {
+ assert locCacheDataStore == null;
+
+ CacheDataStore data = partitionData(part);
+
+ if (data == null)
+ return new GridEmptyCloseableIterator<>();
+
+ // TODO IGNITE-11704.
+ return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null,
null, true);
+ }
+
/**
*
* @param cacheId Cache ID.
@@ -2730,7 +2744,11 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public void removeWithTombstone(GridCacheContext cctx,
KeyCacheObject key, GridCacheVersion ver, int partId) throws
IgniteCheckedException {
+ @Override public void removeWithTombstone(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ GridCacheVersion ver,
+ GridDhtLocalPartition part) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
@@ -2745,6 +2763,8 @@ public class IgniteCacheOffheapManagerImpl implements
IgniteCacheOffheapManager
assert c.operationType() == PUT || c.operationType() ==
IN_PLACE : c.operationType();
+ part.tombstoneCreated();
+
if (!isTombstone(c.oldRow))
cctx.tombstoneCreated();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index e3e6435..13b1761 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -44,6 +44,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -173,6 +174,9 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
/** Set if topology update sequence should be updated on partition
destroy. */
private boolean updateSeqOnDestroy;
+ /** */
+ private volatile boolean tombstoneCreated;
+
/**
* @param ctx Context.
* @param grp Cache group.
@@ -619,8 +623,12 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
assert partState == MOVING || partState == LOST;
- if (casState(state, OWNING))
+ if (casState(state, OWNING)) {
+ if (grp.supportsTombstone())
+ clearTombstones();
+
return true;
+ }
}
}
@@ -1117,6 +1125,95 @@ public class GridDhtLocalPartition extends
GridCacheConcurrentMapImpl implements
}
/**
+ *
+ */
+ public void tombstoneCreated() {
+ tombstoneCreated = true;
+ }
+
+ /**
+ *
+ */
+ private void submitClearTombstones() {
+ if (tombstoneCreated)
+
grp.shared().kernalContext().closure().runLocalSafe(this::clearTombstones,
true);
+ }
+
+ /**
+ *
+ */
+ private void clearTombstones() {
+ final int stopCheckingFreq = 1000;
+
+ CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+
+ try {
+ GridIterator<CacheDataRow> it0 =
grp.offheap().tombstonesIterator(id);
+
+ int cntr = 0;
+
+ while (it0.hasNext()) {
+ CacheDataRow row = it0.next();
+
+ if (!grp.offheap().isTombstone(row))
+ continue;
+
+ assert row.key() != null;
+ assert row.version() != null;
+
+ if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() !=
row.cacheId()))
+ hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+ assert hld != null;
+
+ ctx.database().checkpointReadLock();
+
+ try {
+ while (true) {
+ GridCacheMapEntry cached = null;
+
+ try {
+ cached = putEntryIfObsoleteOrAbsent(
+ hld,
+ hld.cctx,
+ grp.affinity().lastVersion(),
+ row.key(),
+ true,
+ false);
+
+ cached.removeTombstone(row.version());
+
+ cached.touch();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ cached = null;
+ }
+ finally {
+ if (cached != null)
+ cached.touch();
+ }
+ }
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+
+ cntr++;
+
+ if (cntr % stopCheckingFreq == 0) {
+ if (ctx.kernalContext().isStopping() || state() != OWNING)
+ break;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed clear tombstone entries for partition: " +
id, e);
+ }
+ }
+
+ /**
* Removes all entries and rows from this partition.
*
* @return Number of rows cleared from page memory.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index d4bcbd8..427c0b9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2423,12 +2423,16 @@ public class GridCacheOffheapManager extends
IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public void removeWithTombstone(GridCacheContext cctx,
KeyCacheObject key, GridCacheVersion ver, int partId) throws
IgniteCheckedException {
+ @Override public void removeWithTombstone(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ GridCacheVersion ver,
+ GridDhtLocalPartition part) throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate = init0(false);
- delegate.removeWithTombstone(cctx, key, ver, partId);
+ delegate.removeWithTombstone(cctx, key, ver, part);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
index 331fb64..05962c4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -28,8 +30,11 @@ import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -45,7 +50,7 @@ import static
org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
/**
@@ -122,6 +127,8 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception
{
persistence = true;
+ cleanPersistenceDir();
+
testRemoveAndRebalanceRace(TRANSACTIONAL, true);
}
@@ -169,8 +176,7 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
cache0.putAll(map);
-
TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class,
- getTestIgniteInstanceName(1));
+ blockRebalance(ignite0);
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new
Callable<Object>() {
@Override public Object call() throws Exception {
@@ -180,6 +186,12 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
IgniteEx ignite1 = (IgniteEx)fut.get(30_000);
+ if (persistence) {
+ ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+ ignite0.cluster().setBaselineTopology(2);
+ }
+
Set<Integer> removed = new HashSet<>();
// Do removes while rebalance is in progress.
@@ -195,7 +207,7 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
cacheMetricsRegistryName(DEFAULT_CACHE_NAME,
false)).findMetric("Tombstones");
// On first node there should not be tombstones.
- //assertEquals(0, tombstoneMetric0.get());
+ assertEquals(0, tombstoneMetric0.get());
if (expTombstone)
assertEquals(removed.size(), tombstoneMetric1.get());
@@ -213,7 +225,7 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
assert !removed.isEmpty();
- //assertEquals(0, tombstoneMetric0.get());
+ assertEquals(0, tombstoneMetric0.get());
if (expTombstone)
assertEquals(removed.size(), tombstoneMetric1.get());
@@ -242,6 +254,19 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
assertEquals(0, tombstoneMetric1.get());
}
+ /**
+ *
+ */
+ private void blockRebalance(Ignite node) {
+ final int grpId = groupIdForCache(ignite(0), DEFAULT_CACHE_NAME);
+
+ TestRecordingCommunicationSpi.spi(node).blockMessages(new
IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return (msg instanceof GridDhtPartitionSupplyMessage)
+ && ((GridCacheGroupIdMessage)msg).groupId() == grpId;
+ }
+ });
+ }
/**
* @param atomicityMode Cache atomicity mode.
@@ -253,7 +278,7 @@ public class CacheRemoveWithTombstonesTest extends
GridCommonAbstractTest {
ccfg.setAtomicityMode(atomicityMode);
ccfg.setCacheMode(PARTITIONED);
ccfg.setBackups(2);
- ccfg.setRebalanceMode(SYNC);
+ ccfg.setRebalanceMode(ASYNC);
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
return ccfg;