This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-11704 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 2a0a0e0ea41f3f1aa9daca279546625b168faf81 Author: sboikov <[email protected]> AuthorDate: Fri Jul 19 21:20:51 2019 +0300 ignite-11704 --- .../processors/cache/CacheGroupContext.java | 3 +- .../processors/cache/GridCacheMapEntry.java | 88 +++++++++++++++++- .../cache/IgniteCacheOffheapManager.java | 3 +- .../cache/IgniteCacheOffheapManagerImpl.java | 70 ++++++++------ .../dht/topology/GridDhtLocalPartition.java | 101 ++++++++++++++++++++- .../cache/persistence/CacheDataRowAdapter.java | 22 ++++- .../GridCacheDatabaseSharedManager.java | 2 + .../cache/persistence/GridCacheOffheapManager.java | 8 +- .../IgniteCacheDatabaseSharedManager.java | 59 ++++++++++++ .../distributed/CacheRemoveWithTombstonesTest.java | 39 ++++++-- 10 files changed, 350 insertions(+), 45 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 4af5de5..7963893 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; @@ -1307,7 +1308,7 @@ public class CacheGroupContext { } public boolean createTombstone(@Nullable GridDhtLocalPartition part) { - return part != null && supportsTombstone(); + return part != null && supportsTombstone() && part.state() == GridDhtPartitionState.MOVING; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index adc8699..08986a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1717,8 +1717,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - if (cctx.group().createTombstone(localPartition())) - cctx.offheap().removeWithTombstone(cctx, key, newVer, partition(), localPartition()); + if (cctx.group().createTombstone(localPartition())) { + cctx.offheap().removeWithTombstone(cctx, key, newVer, localPartition()); + + if (!cctx.group().createTombstone(localPartition())) + removeTombstone0(newVer); + } else removeValue(); @@ -2818,6 +2822,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @param tombstoneVer Tombstone version. + * @throws GridCacheEntryRemovedException If entry was removed. + * @throws IgniteCheckedException If failed. + */ + public void removeTombstone(GridCacheVersion tombstoneVer) throws GridCacheEntryRemovedException, IgniteCheckedException { + lockEntry(); + + try { + checkObsolete(); + + removeTombstone0(tombstoneVer); + } + finally { + unlockEntry(); + } + } + + /** + * @param tombstoneVer Tombstone version. + * @throws IgniteCheckedException If failed. + */ + private void removeTombstone0(GridCacheVersion tombstoneVer) throws IgniteCheckedException { + RemoveClosure closure = new RemoveClosure(this, tombstoneVer); + + cctx.offheap().invoke(cctx, key, localPartition(), closure); + } + + /** * @return {@code True} if this entry should not be evicted from cache. */ protected boolean evictionDisabled() { @@ -5720,6 +5752,58 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * */ + private static class RemoveClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { + /** */ + private final GridCacheMapEntry entry; + + /** */ + private final GridCacheVersion ver; + + /** */ + private IgniteTree.OperationType op; + + /** */ + private CacheDataRow oldRow; + + public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) { + this.entry = entry; + this.ver = ver; + } + + /** {@inheritDoc} */ + @Override public @Nullable CacheDataRow oldRow() { + return oldRow; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable CacheDataRow row) throws IgniteCheckedException { + if (row == null || !ver.equals(row.version())) { + op = IgniteTree.OperationType.NOOP; + + return; + } + + row.key(entry.key); + + oldRow = row; + + op = IgniteTree.OperationType.REMOVE; + } + + /** {@inheritDoc} */ + @Override public CacheDataRow newRow() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return op; + } + } + + /** + * + */ private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure { /** */ private final GridCacheMapEntry entry; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index c11e909..2272439 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -406,7 +406,6 @@ public interface IgniteCacheOffheapManager { GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, - int partId, GridDhtLocalPartition part ) throws IgniteCheckedException; @@ -917,7 +916,7 @@ public interface IgniteCacheOffheapManager { * @param partId Partition number. * @throws IgniteCheckedException If failed. */ - public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException; + public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, GridDhtLocalPartition part) throws IgniteCheckedException; /** * @param cctx Cache context. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index c45e3b1..7ae45b48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -179,9 +178,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** */ protected GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors()); - /** */ - private CacheObject NULL_VAL; - /** {@inheritDoc} */ @Override public GridAtomicLong globalRemoveId() { return globalRmvId; @@ -203,8 +199,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (grp.isLocal()) locCacheDataStore = createCacheDataStore(0); - - NULL_VAL = new CacheObjectImpl(null, ctx.marshaller().marshal(null)); } finally { ctx.database().checkpointReadUnlock(); @@ -632,28 +626,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, - int partId, GridDhtLocalPartition part) throws IgniteCheckedException { - dataStore(part).removeWithTombstone(cctx, key, ver, partId); + assert part != null; + + dataStore(part).removeWithTombstone(cctx, key, ver, part); } @Override public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException { - if (row == null || !grp.supportsTombstone()) + if (!grp.supportsTombstone()) return false; - CacheObject val = row.value(); - - assert val != null : row; - - if (val.cacheObjectType() == CacheObject.TYPE_REGULAR) { - byte[] null_bytes = NULL_VAL.valueBytes(null); - byte[] bytes = val.valueBytes(null); - - if (Arrays.equals(null_bytes, bytes)) - return true; - } - - return false; + return grp.shared().database().isTombstone(row); } /** {@inheritDoc} */ @@ -2712,7 +2695,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager this.oldRow = oldRow; - newRow = createRow(cctx, key, NULL_VAL, ver, 0, oldRow); + newRow = createRow(cctx, key, cctx.shared().database().tombstoneValue(), ver, 0, oldRow); } /** {@inheritDoc} */ @@ -2730,7 +2713,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException { + @Override public void removeWithTombstone( + GridCacheContext cctx, + KeyCacheObject key, + GridCacheVersion ver, + GridDhtLocalPartition part) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); @@ -2745,6 +2732,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert c.operationType() == PUT || c.operationType() == IN_PLACE : c.operationType(); + part.tombstoneCreated(); + if (!isTombstone(c.oldRow)) cctx.tombstoneCreated(); @@ -2933,7 +2922,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager @Override public GridCursor<? extends CacheDataRow> cursor(boolean withTombstones) throws IgniteCheckedException { GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null); - return withTombstones ? cur : cursorSkipTombstone(cur); + return withTombstones ? cursorSkipEmpty(cur) : cursorSkipTombstone(cur); + } + + private GridCursor<? extends CacheDataRow> cursorSkipEmpty(final GridCursor<? extends CacheDataRow> cur) { + if (!grp.supportsTombstone()) + return cur; + + return new GridCursor<CacheDataRow>() { + CacheDataRow next; + + @Override public boolean next() throws IgniteCheckedException { + while (cur.next()) { + CacheDataRow next = cur.get(); + + if (next.version() != null) { + this.next = next; + + return true; + } + } + + return false; + } + + @Override public CacheDataRow get() { + return next; + } + }; } private GridCursor<? extends CacheDataRow> cursorSkipTombstone(final GridCursor<? extends CacheDataRow> cur) { @@ -2965,7 +2981,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException { - return cursorSkipTombstone(dataTree.find(null, null, x)); + GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null, x); + + return x == CacheDataRowAdapter.RowData.TOMBSTONES ? cursorSkipEmpty(cur) : cursorSkipTombstone(cur); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java index e3e6435..b5a63e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -54,12 +55,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -173,6 +176,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** Set if topology update sequence should be updated on partition destroy. */ private boolean updateSeqOnDestroy; + /** */ + private volatile boolean tombstoneCreated; + /** * @param ctx Context. * @param grp Cache group. @@ -619,8 +625,12 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert partState == MOVING || partState == LOST; - if (casState(state, OWNING)) + if (casState(state, OWNING)) { + if (grp.supportsTombstone()) + submitClearTombstones(); + return true; + } } } @@ -1117,6 +1127,95 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * + */ + public void tombstoneCreated() { + tombstoneCreated = true; + } + + /** + * + */ + private void submitClearTombstones() { + if (tombstoneCreated) + grp.shared().kernalContext().closure().runLocalSafe(this::clearTombstones, true); + } + + /** + * + */ + private void clearTombstones() { + final int stopCheckingFreq = 1000; + + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; + + try { + GridCursor<? extends CacheDataRow> cur = store.cursor(CacheDataRowAdapter.RowData.TOMBSTONES); + + int cntr = 0; + + while (cur.next()) { + CacheDataRow row = cur.get(); + + if (!grp.offheap().isTombstone(row)) + continue; + + assert row.key() != null; + assert row.version() != null; + + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) + hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); + + assert hld != null; + + ctx.database().checkpointReadLock(); + + try { + while (true) { + GridCacheMapEntry cached = null; + + try { + cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); + + cached.removeTombstone(row.version()); + + cached.touch(); + + break; + } + catch (GridCacheEntryRemovedException e) { + cached = null; + } + finally { + if (cached != null) + cached.touch(); + } + } + } + finally { + ctx.database().checkpointReadUnlock(); + } + + cntr++; + + if (cntr % stopCheckingFreq == 0) { + if (ctx.kernalContext().isStopping() || state() != OWNING) + break; + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed clear tombstone entries for partition: " + id, e); + } + } + + /** * Removes all entries and rows from this partition. * * @return Number of rows cleared from page memory. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index f27a311..df46885 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -498,6 +498,14 @@ public class CacheDataRowAdapter implements CacheDataRow { int len = PageUtils.getInt(addr, off); off += 4; + boolean tombstones = rowData == RowData.TOMBSTONES; + + if (tombstones && !sharedCtx.database().isTombstone(addr + off + len + 1)) { + verReady = true; + + return; + } + if (rowData != RowData.NO_KEY && rowData != RowData.NO_KEY_WITH_HINTS) { byte type = PageUtils.getByte(addr, off); off++; @@ -519,10 +527,13 @@ public class CacheDataRowAdapter implements CacheDataRow { byte type = PageUtils.getByte(addr, off); off++; - byte[] bytes = PageUtils.getBytes(addr, off, len); - off += len; + if (!tombstones) { + byte[] bytes = PageUtils.getBytes(addr, off, len); + + val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes); + } - val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes); + off += len; int verLen; @@ -941,7 +952,10 @@ public class CacheDataRowAdapter implements CacheDataRow { FULL_WITH_HINTS, /** Force instant hints actualization for update operation with history (to avoid races with vacuum). */ - NO_KEY_WITH_HINTS + NO_KEY_WITH_HINTS, + + /** Do not read row data for non-tombstone entries. */ + TOMBSTONES } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 96cbe2d..8b7ef11 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -120,6 +120,8 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRec import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index d4bcbd8..427c0b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -2423,12 +2423,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject key, GridCacheVersion ver, int partId) throws IgniteCheckedException { + @Override public void removeWithTombstone( + GridCacheContext cctx, + KeyCacheObject key, + GridCacheVersion ver, + GridDhtLocalPartition part) throws IgniteCheckedException { assert ctx.database().checkpointLockIsHeldByThread(); CacheDataStore delegate = init0(false); - delegate.removeWithTombstone(cctx, key, ver, partId); + delegate.removeWithTombstone(cctx, key, ver, part); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index 69a5d50..d1ece85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.io.File; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -45,10 +46,13 @@ import org.apache.ignite.internal.mem.DirectMemoryRegion; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; @@ -130,6 +134,8 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** First eviction was warned flag. */ private volatile boolean firstEvictWarn; + /** */ + private CacheObject TOMBSTONE_VAL; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -145,8 +151,61 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap pageSize = memCfg.getPageSize(); initDataRegions(memCfg); + + TOMBSTONE_VAL = new CacheObjectImpl(null, cctx.marshaller().marshal(null)); + } + + public CacheObject tombstoneValue() { + return TOMBSTONE_VAL; + } + + public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException { + if (row == null) + return false; + + CacheObject val = row.value(); + + assert val != null : row; + + if (val.cacheObjectType() == CacheObject.TYPE_REGULAR) { + byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null); + byte[] bytes = val.valueBytes(null); + + if (Arrays.equals(nullBytes, bytes)) + return true; + } + + return false; } + public boolean isTombstone(long addr) throws IgniteCheckedException { + int off = 0; + + byte type = PageUtils.getByte(addr, off + 4); + + if (type != CacheObject.TYPE_REGULAR) + return false; + + byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null); + + int len = PageUtils.getInt(addr, off); + + if (len != nullBytes.length) + return false; + + off += 5; + + for (int i = 0; i < len; i++) { + byte b = PageUtils.getByte(addr, off++); + + if (nullBytes[i] != b) + return false; + } + + return true; + } + + /** * @param cfg Ignite configuration. * @param groupName Name of group. diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java index 331fb64..05962c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -28,8 +30,11 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -45,7 +50,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; /** @@ -122,6 +127,8 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception { persistence = true; + cleanPersistenceDir(); + testRemoveAndRebalanceRace(TRANSACTIONAL, true); } @@ -169,8 +176,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { cache0.putAll(map); - TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class, - getTestIgniteInstanceName(1)); + blockRebalance(ignite0); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -180,6 +186,12 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { IgniteEx ignite1 = (IgniteEx)fut.get(30_000); + if (persistence) { + ignite0.cluster().baselineAutoAdjustEnabled(false); + + ignite0.cluster().setBaselineTopology(2); + } + Set<Integer> removed = new HashSet<>(); // Do removes while rebalance is in progress. @@ -195,7 +207,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones"); // On first node there should not be tombstones. - //assertEquals(0, tombstoneMetric0.get()); + assertEquals(0, tombstoneMetric0.get()); if (expTombstone) assertEquals(removed.size(), tombstoneMetric1.get()); @@ -213,7 +225,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { assert !removed.isEmpty(); - //assertEquals(0, tombstoneMetric0.get()); + assertEquals(0, tombstoneMetric0.get()); if (expTombstone) assertEquals(removed.size(), tombstoneMetric1.get()); @@ -242,6 +254,19 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { assertEquals(0, tombstoneMetric1.get()); } + /** + * + */ + private void blockRebalance(Ignite node) { + final int grpId = groupIdForCache(ignite(0), DEFAULT_CACHE_NAME); + + TestRecordingCommunicationSpi.spi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + return (msg instanceof GridDhtPartitionSupplyMessage) + && ((GridCacheGroupIdMessage)msg).groupId() == grpId; + } + }); + } /** * @param atomicityMode Cache atomicity mode. @@ -253,7 +278,7 @@ public class CacheRemoveWithTombstonesTest extends GridCommonAbstractTest { ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(PARTITIONED); ccfg.setBackups(2); - ccfg.setRebalanceMode(SYNC); + ccfg.setRebalanceMode(ASYNC); ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); return ccfg;
