This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-11704
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 2a0a0e0ea41f3f1aa9daca279546625b168faf81
Author: sboikov <[email protected]>
AuthorDate: Fri Jul 19 21:20:51 2019 +0300

    ignite-11704
---
 .../processors/cache/CacheGroupContext.java        |   3 +-
 .../processors/cache/GridCacheMapEntry.java        |  88 +++++++++++++++++-
 .../cache/IgniteCacheOffheapManager.java           |   3 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |  70 ++++++++------
 .../dht/topology/GridDhtLocalPartition.java        | 101 ++++++++++++++++++++-
 .../cache/persistence/CacheDataRowAdapter.java     |  22 ++++-
 .../GridCacheDatabaseSharedManager.java            |   2 +
 .../cache/persistence/GridCacheOffheapManager.java |   8 +-
 .../IgniteCacheDatabaseSharedManager.java          |  59 ++++++++++++
 .../distributed/CacheRemoveWithTombstonesTest.java |  39 ++++++--
 10 files changed, 350 insertions(+), 45 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 4af5de5..7963893 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
@@ -1307,7 +1308,7 @@ public class CacheGroupContext {
     }
 
     public boolean createTombstone(@Nullable GridDhtLocalPartition part) {
-        return part != null && supportsTombstone();
+        return part != null && supportsTombstone() && part.state() == 
GridDhtPartitionState.MOVING;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index adc8699..08986a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1717,8 +1717,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 }
             }
 
-            if (cctx.group().createTombstone(localPartition()))
-                cctx.offheap().removeWithTombstone(cctx, key, newVer, 
partition(), localPartition());
+            if (cctx.group().createTombstone(localPartition())) {
+                cctx.offheap().removeWithTombstone(cctx, key, newVer, 
localPartition());
+
+                if (!cctx.group().createTombstone(localPartition()))
+                    removeTombstone0(newVer);
+            }
             else
                 removeValue();
 
@@ -2818,6 +2822,34 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @param tombstoneVer Tombstone version.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void removeTombstone(GridCacheVersion tombstoneVer) throws 
GridCacheEntryRemovedException, IgniteCheckedException {
+        lockEntry();
+
+        try {
+            checkObsolete();
+
+            removeTombstone0(tombstoneVer);
+        }
+        finally {
+            unlockEntry();
+        }
+    }
+
+    /**
+     * @param tombstoneVer Tombstone version.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void removeTombstone0(GridCacheVersion tombstoneVer) throws 
IgniteCheckedException {
+        RemoveClosure closure = new RemoveClosure(this, tombstoneVer);
+
+        cctx.offheap().invoke(cctx, key, localPartition(), closure);
+    }
+
+    /**
      * @return {@code True} if this entry should not be evicted from cache.
      */
     protected boolean evictionDisabled() {
@@ -5720,6 +5752,58 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     /**
      *
      */
+    private static class RemoveClosure implements 
IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private final GridCacheVersion ver;
+
+        /** */
+        private IgniteTree.OperationType op;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        public RemoveClosure(GridCacheMapEntry entry, GridCacheVersion ver) {
+            this.entry = entry;
+            this.ver = ver;
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable CacheDataRow oldRow() {
+            return oldRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow row) throws 
IgniteCheckedException {
+            if (row == null || !ver.equals(row.version())) {
+                op = IgniteTree.OperationType.NOOP;
+
+                return;
+            }
+
+            row.key(entry.key);
+
+            oldRow = row;
+
+           op = IgniteTree.OperationType.REMOVE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return op;
+        }
+    }
+
+    /**
+     *
+     */
     private static class UpdateClosure implements 
IgniteCacheOffheapManager.OffheapInvokeClosure {
         /** */
         private final GridCacheMapEntry entry;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index c11e909..2272439 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -406,7 +406,6 @@ public interface IgniteCacheOffheapManager {
         GridCacheContext cctx,
         KeyCacheObject key,
         GridCacheVersion ver,
-        int partId,
         GridDhtLocalPartition part
     ) throws IgniteCheckedException;
 
@@ -917,7 +916,7 @@ public interface IgniteCacheOffheapManager {
          * @param partId Partition number.
          * @throws IgniteCheckedException If failed.
          */
-        public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject 
key, GridCacheVersion ver, int partId) throws IgniteCheckedException;
+        public void removeWithTombstone(GridCacheContext cctx, KeyCacheObject 
key, GridCacheVersion ver, GridDhtLocalPartition part) throws 
IgniteCheckedException;
 
         /**
          * @param cctx Cache context.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c45e3b1..7ae45b48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -179,9 +178,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     /** */
     protected GridStripedLock partStoreLock = new 
GridStripedLock(Runtime.getRuntime().availableProcessors());
 
-    /** */
-    private CacheObject NULL_VAL;
-
     /** {@inheritDoc} */
     @Override public GridAtomicLong globalRemoveId() {
         return globalRmvId;
@@ -203,8 +199,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 if (grp.isLocal())
                     locCacheDataStore = createCacheDataStore(0);
-
-                NULL_VAL = new CacheObjectImpl(null, 
ctx.marshaller().marshal(null));
             }
             finally {
                 ctx.database().checkpointReadUnlock();
@@ -632,28 +626,17 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         GridCacheContext cctx,
         KeyCacheObject key,
         GridCacheVersion ver,
-        int partId,
         GridDhtLocalPartition part) throws IgniteCheckedException {
-        dataStore(part).removeWithTombstone(cctx, key, ver, partId);
+        assert part != null;
+
+        dataStore(part).removeWithTombstone(cctx, key, ver, part);
     }
 
     @Override public boolean isTombstone(CacheDataRow row) throws 
IgniteCheckedException {
-        if (row == null || !grp.supportsTombstone())
+        if (!grp.supportsTombstone())
             return false;
 
-        CacheObject val = row.value();
-
-        assert val != null : row;
-
-        if (val.cacheObjectType() == CacheObject.TYPE_REGULAR) {
-            byte[] null_bytes = NULL_VAL.valueBytes(null);
-            byte[] bytes = val.valueBytes(null);
-
-            if (Arrays.equals(null_bytes, bytes))
-                return true;
-        }
-
-        return false;
+        return grp.shared().database().isTombstone(row);
     }
 
     /** {@inheritDoc} */
@@ -2712,7 +2695,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 this.oldRow = oldRow;
 
-                newRow = createRow(cctx, key, NULL_VAL, ver, 0, oldRow);
+                newRow = createRow(cctx, key, 
cctx.shared().database().tombstoneValue(), ver, 0, oldRow);
             }
 
             /** {@inheritDoc} */
@@ -2730,7 +2713,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public void removeWithTombstone(GridCacheContext cctx, 
KeyCacheObject key, GridCacheVersion ver, int partId) throws 
IgniteCheckedException {
+        @Override public void removeWithTombstone(
+                GridCacheContext cctx,
+                KeyCacheObject key,
+                GridCacheVersion ver,
+                GridDhtLocalPartition part) throws IgniteCheckedException {
             if (!busyLock.enterBusy())
                 throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
 
@@ -2745,6 +2732,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert c.operationType() == PUT || c.operationType() == 
IN_PLACE : c.operationType();
 
+                part.tombstoneCreated();
+
                 if (!isTombstone(c.oldRow))
                     cctx.tombstoneCreated();
 
@@ -2933,7 +2922,34 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         @Override public GridCursor<? extends CacheDataRow> cursor(boolean 
withTombstones) throws IgniteCheckedException {
             GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
 
-            return withTombstones ? cur : cursorSkipTombstone(cur);
+            return withTombstones ? cursorSkipEmpty(cur) : 
cursorSkipTombstone(cur);
+        }
+
+        private GridCursor<? extends CacheDataRow> cursorSkipEmpty(final 
GridCursor<? extends CacheDataRow> cur) {
+            if (!grp.supportsTombstone())
+                return cur;
+
+            return new GridCursor<CacheDataRow>() {
+                CacheDataRow next;
+
+                @Override public boolean next() throws IgniteCheckedException {
+                    while (cur.next()) {
+                        CacheDataRow next = cur.get();
+
+                        if (next.version() != null) {
+                            this.next = next;
+
+                            return true;
+                        }
+                    }
+
+                    return false;
+                }
+
+                @Override public CacheDataRow get() {
+                    return next;
+                }
+            };
         }
 
         private GridCursor<? extends CacheDataRow> cursorSkipTombstone(final 
GridCursor<? extends CacheDataRow> cur) {
@@ -2965,7 +2981,9 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
         /** {@inheritDoc} */
         @Override public GridCursor<? extends CacheDataRow> cursor(Object x) 
throws IgniteCheckedException {
-            return cursorSkipTombstone(dataTree.find(null, null, x));
+            GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null, 
x);
+
+            return x == CacheDataRowAdapter.RowData.TOMBSTONES ? 
cursorSkipEmpty(cur) : cursorSkipTombstone(cur);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index e3e6435..b5a63e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -54,12 +55,14 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -173,6 +176,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     /** Set if topology update sequence should be updated on partition 
destroy. */
     private boolean updateSeqOnDestroy;
 
+    /** */
+    private volatile boolean tombstoneCreated;
+
     /**
      * @param ctx Context.
      * @param grp Cache group.
@@ -619,8 +625,12 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
             assert partState == MOVING || partState == LOST;
 
-            if (casState(state, OWNING))
+            if (casState(state, OWNING)) {
+                if (grp.supportsTombstone())
+                    submitClearTombstones();
+
                 return true;
+            }
         }
     }
 
@@ -1117,6 +1127,95 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
+     *
+     */
+    public void tombstoneCreated() {
+        tombstoneCreated = true;
+    }
+
+    /**
+     *
+     */
+    private void submitClearTombstones() {
+        if (tombstoneCreated)
+            
grp.shared().kernalContext().closure().runLocalSafe(this::clearTombstones, 
true);
+    }
+
+    /**
+     *
+     */
+    private void clearTombstones() {
+        final int stopCheckingFreq = 1000;
+
+        CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
+
+        try {
+            GridCursor<? extends CacheDataRow> cur = 
store.cursor(CacheDataRowAdapter.RowData.TOMBSTONES);
+
+            int cntr = 0;
+
+            while (cur.next()) {
+                CacheDataRow row = cur.get();
+
+                if (!grp.offheap().isTombstone(row))
+                    continue;
+
+                assert row.key() != null;
+                assert row.version() != null;
+
+                if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != 
row.cacheId()))
+                    hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+                assert hld != null;
+
+                ctx.database().checkpointReadLock();
+
+                try {
+                    while (true) {
+                        GridCacheMapEntry cached = null;
+
+                        try {
+                            cached = putEntryIfObsoleteOrAbsent(
+                                hld,
+                                hld.cctx,
+                                grp.affinity().lastVersion(),
+                                row.key(),
+                                true,
+                                false);
+
+                            cached.removeTombstone(row.version());
+
+                            cached.touch();
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException e) {
+                            cached = null;
+                        }
+                        finally {
+                            if (cached != null)
+                                cached.touch();
+                        }
+                    }
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
+
+                cntr++;
+
+                if (cntr % stopCheckingFreq == 0) {
+                    if (ctx.kernalContext().isStopping() || state() != OWNING)
+                        break;
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed clear tombstone entries for partition: " + 
id, e);
+        }
+    }
+
+    /**
      * Removes all entries and rows from this partition.
      *
      * @return Number of rows cleared from page memory.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index f27a311..df46885 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -498,6 +498,14 @@ public class CacheDataRowAdapter implements CacheDataRow {
         int len = PageUtils.getInt(addr, off);
         off += 4;
 
+        boolean tombstones = rowData == RowData.TOMBSTONES;
+
+        if (tombstones && !sharedCtx.database().isTombstone(addr + off + len + 
1)) {
+            verReady = true;
+
+            return;
+        }
+
         if (rowData != RowData.NO_KEY && rowData != RowData.NO_KEY_WITH_HINTS) 
{
             byte type = PageUtils.getByte(addr, off);
             off++;
@@ -519,10 +527,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
         byte type = PageUtils.getByte(addr, off);
         off++;
 
-        byte[] bytes = PageUtils.getBytes(addr, off, len);
-        off += len;
+        if (!tombstones) {
+            byte[] bytes = PageUtils.getBytes(addr, off, len);
+
+            val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, 
type, bytes);
+        }
 
-        val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, 
bytes);
+        off += len;
 
         int verLen;
 
@@ -941,7 +952,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
         FULL_WITH_HINTS,
 
         /** Force instant hints actualization for update operation with 
history (to avoid races with vacuum). */
-        NO_KEY_WITH_HINTS
+        NO_KEY_WITH_HINTS,
+
+        /** Do not read row data for non-tombstone entries. */
+        TOMBSTONES
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 96cbe2d..8b7ef11 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -120,6 +120,8 @@ import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRec
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index d4bcbd8..427c0b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2423,12 +2423,16 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public void removeWithTombstone(GridCacheContext cctx, 
KeyCacheObject key, GridCacheVersion ver, int partId) throws 
IgniteCheckedException {
+        @Override public void removeWithTombstone(
+                GridCacheContext cctx,
+                KeyCacheObject key,
+                GridCacheVersion ver,
+                GridDhtLocalPartition part) throws IgniteCheckedException {
             assert ctx.database().checkpointLockIsHeldByThread();
 
             CacheDataStore delegate = init0(false);
 
-            delegate.removeWithTombstone(cctx, key, ver, partId);
+            delegate.removeWithTombstone(cctx, key, ver, part);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 69a5d50..d1ece85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.persistence;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -45,10 +46,13 @@ import org.apache.ignite.internal.mem.DirectMemoryRegion;
 import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -130,6 +134,8 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
     /** First eviction was warned flag. */
     private volatile boolean firstEvictWarn;
 
+    /** */
+    private CacheObject TOMBSTONE_VAL;
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
@@ -145,8 +151,61 @@ public class IgniteCacheDatabaseSharedManager extends 
GridCacheSharedManagerAdap
         pageSize = memCfg.getPageSize();
 
         initDataRegions(memCfg);
+
+        TOMBSTONE_VAL = new CacheObjectImpl(null, 
cctx.marshaller().marshal(null));
+    }
+
+    public CacheObject tombstoneValue() {
+        return TOMBSTONE_VAL;
+    }
+
+    public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException 
{
+        if (row == null)
+            return false;
+
+        CacheObject val = row.value();
+
+        assert val != null : row;
+
+        if (val.cacheObjectType() == CacheObject.TYPE_REGULAR) {
+            byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null);
+            byte[] bytes = val.valueBytes(null);
+
+            if (Arrays.equals(nullBytes, bytes))
+                return true;
+        }
+
+        return false;
     }
 
+    public boolean isTombstone(long addr) throws IgniteCheckedException {
+        int off = 0;
+
+        byte type = PageUtils.getByte(addr, off + 4);
+
+        if (type != CacheObject.TYPE_REGULAR)
+            return false;
+
+        byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null);
+
+        int len = PageUtils.getInt(addr, off);
+
+        if (len != nullBytes.length)
+            return false;
+
+        off += 5;
+
+        for (int i = 0; i < len; i++) {
+            byte b = PageUtils.getByte(addr, off++);
+
+            if (nullBytes[i] != b)
+                return false;
+        }
+
+        return true;
+    }
+
+
     /**
      * @param cfg Ignite configuration.
      * @param groupName Name of group.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
index 331fb64..05962c4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheRemoveWithTombstonesTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -28,8 +30,11 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -45,7 +50,7 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static 
org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
 
 /**
@@ -122,6 +127,8 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
     public void testRemoveAndRebalanceRaceTxWithPersistence() throws Exception 
{
         persistence = true;
 
+        cleanPersistenceDir();
+
         testRemoveAndRebalanceRace(TRANSACTIONAL, true);
     }
 
@@ -169,8 +176,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         cache0.putAll(map);
 
-        
TestRecordingCommunicationSpi.spi(ignite0).blockMessages(GridDhtPartitionSupplyMessageV2.class,
-                getTestIgniteInstanceName(1));
+        blockRebalance(ignite0);
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
             @Override public Object call() throws Exception {
@@ -180,6 +186,12 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         IgniteEx ignite1 = (IgniteEx)fut.get(30_000);
 
+        if (persistence) {
+            ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+            ignite0.cluster().setBaselineTopology(2);
+        }
+
         Set<Integer> removed = new HashSet<>();
 
         // Do removes while rebalance is in progress.
@@ -195,7 +207,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
                 cacheMetricsRegistryName(DEFAULT_CACHE_NAME, 
false)).findMetric("Tombstones");
 
         // On first node there should not be tombstones.
-        //assertEquals(0, tombstoneMetric0.get());
+        assertEquals(0, tombstoneMetric0.get());
 
         if (expTombstone)
             assertEquals(removed.size(), tombstoneMetric1.get());
@@ -213,7 +225,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         assert !removed.isEmpty();
 
-        //assertEquals(0, tombstoneMetric0.get());
+        assertEquals(0, tombstoneMetric0.get());
 
         if (expTombstone)
             assertEquals(removed.size(), tombstoneMetric1.get());
@@ -242,6 +254,19 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
 
         assertEquals(0, tombstoneMetric1.get());
     }
+    /**
+     *
+     */
+    private void blockRebalance(Ignite node) {
+        final int grpId = groupIdForCache(ignite(0), DEFAULT_CACHE_NAME);
+
+        TestRecordingCommunicationSpi.spi(node).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return (msg instanceof GridDhtPartitionSupplyMessage)
+                        && ((GridCacheGroupIdMessage)msg).groupId() == grpId;
+            }
+        });
+    }
 
     /**
      * @param atomicityMode Cache atomicity mode.
@@ -253,7 +278,7 @@ public class CacheRemoveWithTombstonesTest extends 
GridCommonAbstractTest {
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setBackups(2);
-        ccfg.setRebalanceMode(SYNC);
+        ccfg.setRebalanceMode(ASYNC);
         
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
         return ccfg;

Reply via email to