IGNITE-10348 Safely recreate metastore to mitigate IGNITE-8735

Signed-off-by: Ivan Rakov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86f92d96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86f92d96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86f92d96

Branch: refs/heads/ignite-10189
Commit: 86f92d9697e6bb1f138b5d2e477d7eb5cf8c8814
Parents: 137a0e7
Author: Alexey Stelmak <[email protected]>
Authored: Fri Dec 7 18:56:48 2018 +0300
Committer: Ivan Rakov <[email protected]>
Committed: Fri Dec 7 18:56:48 2018 +0300

----------------------------------------------------------------------
 .../internal/pagemem/PageIdAllocator.java       |   5 +-
 .../processors/cache/GridCacheProcessor.java    |  18 +
 .../persistence/file/FilePageStoreManager.java  |   8 +-
 .../persistence/metastorage/MetaStorage.java    | 406 ++++++++++++++++++-
 .../metastorage/MetastorageDataRow.java         |   3 +-
 .../metastorage/MetastorageTree.java            |  11 +-
 .../IgnitePdsCorruptedStoreTest.java            |   3 +-
 .../metastorage/IgniteMetaStorageBasicTest.java | 273 ++++++++++++-
 8 files changed, 702 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
index b3e4b07..d91d31d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java
@@ -35,8 +35,11 @@ public interface PageIdAllocator {
     /** Special partition reserved for index space. */
     public static final int INDEX_PARTITION = 0xFFFF;
 
+    /** Old special partition reserved for metastore space. */
+    public static final int OLD_METASTORE_PARTITION = 0x0;
+
     /** Special partition reserved for metastore space. */
-    public static final int METASTORE_PARTITION = 0x0;
+    public static final int METASTORE_PARTITION = 0x1;
 
     /**
      * Allocates a page from the space for the given partition ID and the 
given flags.

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 375dd12..3e32ead 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -105,6 +105,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -277,6 +278,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Cache recovery lifecycle state and actions. */
     private final CacheRecoveryLifecycle recovery = new 
CacheRecoveryLifecycle();
 
+    /** Tmp storage for meta migration. */
+    private MetaStorage.TmpStorage tmpStorage;
+
     /**
      * @param ctx Kernal context.
      */
@@ -5471,6 +5475,20 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Get Temporary storage
+     */
+    public MetaStorage.TmpStorage getTmpStorage() {
+        return tmpStorage;
+    }
+
+    /**
+     * Set Temporary storage
+     */
+    public void setTmpStorage(MetaStorage.TmpStorage tmpStorage) {
+        this.tmpStorage = tmpStorage;
+    }
+
+    /**
      * Recovery lifecycle for caches.
      */
     private class CacheRecoveryLifecycle implements 
MetastorageLifecycleListener, DatabaseLifecycleListener {

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index e80107f..5a4eb8b 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -374,10 +374,10 @@ public class FilePageStoreManager extends 
GridCacheSharedManagerAdapter implemen
 
             CacheStoreHolder holder = initDir(
                 new File(storeWorkDir, META_STORAGE_NAME),
-                    grpId,
-                    1,
-                    dataRegion.memoryMetrics(),
-                    false);
+                grpId,
+                PageIdAllocator.METASTORE_PARTITION + 1,
+                dataRegion.memoryMetrics(),
+                false);
 
             CacheStoreHolder old = idxCacheStores.put(grpId, holder);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index a61bf64..7ff8257 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -17,13 +17,24 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -32,10 +43,11 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IncompleteObject;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
@@ -44,6 +56,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListe
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
@@ -58,12 +71,14 @@ import 
org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.NotNull;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.OLD_METASTORE_PARTITION;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
 
@@ -77,9 +92,19 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     /** */
     public static final int METASTORAGE_CACHE_ID = 
CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** This flag is used ONLY FOR TESTING the migration of a metastorage from 
Part 0 to Part 1. */
+    public static boolean PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
+
     /** Marker for removed entry. */
     private static final byte[] TOMBSTONE = new byte[0];
 
+    /** Temporary metastorage memory size. */
+    private static final int TEMPORARY_METASTORAGE_IN_MEMORY_SIZE = 128 * 1024 
* 1024;
+
+    /** Temporary metastorage buffer size (file). */
+    private static final int TEMPORARY_METASTORAGE_BUFFER_SIZE = 1024 * 1024;
+
+
     /** */
     private final IgniteWriteAheadLogManager wal;
 
@@ -99,7 +124,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     private DataRegionMetricsImpl regionMetrics;
 
     /** */
-    private boolean readOnly;
+    private final boolean readOnly;
 
     /** */
     private boolean empty;
@@ -122,6 +147,12 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     /** */
     private final FailureProcessor failureProcessor;
 
+    /** Partition id. */
+    private int partId;
+
+    /** Cctx. */
+    private final GridCacheSharedContext cctx;
+
     /** */
     public MetaStorage(
         GridCacheSharedContext cctx,
@@ -129,6 +160,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
         DataRegionMetricsImpl regionMetrics,
         boolean readOnly
     ) {
+        this.cctx = cctx;
         wal = cctx.wal();
         this.dataRegion = dataRegion;
         this.regionMetrics = regionMetrics;
@@ -138,10 +170,79 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     }
 
     /** */
-    public void init(IgniteCacheDatabaseSharedManager db) throws 
IgniteCheckedException {
+    public void init(GridCacheDatabaseSharedManager db) throws 
IgniteCheckedException {
         regionMetrics.clear();
+        initInternal(db);
+
+        if (!PRESERVE_LEGACY_METASTORAGE_PARTITION_ID) {
+            GridCacheProcessor gcProcessor = cctx.kernalContext().cache();
+
+            if (partId == OLD_METASTORE_PARTITION)
+                gcProcessor.setTmpStorage(copyDataToTmpStorage());
+            else if (gcProcessor.getTmpStorage() != null) {
+                restoreDataFromTmpStorage(gcProcessor.getTmpStorage());
+
+                gcProcessor.setTmpStorage(null);
+
+                // remove old partitions
+                CacheGroupContext cgc = 
cctx.cache().cacheGroup(METASTORAGE_CACHE_ID);
 
-        getOrAllocateMetas();
+                if (cgc != null) {
+                    db.schedulePartitionDestroy(METASTORAGE_CACHE_ID, 
OLD_METASTORE_PARTITION);
+
+                    db.schedulePartitionDestroy(METASTORAGE_CACHE_ID, 
PageIdAllocator.INDEX_PARTITION);
+                }
+            }
+        }
+    }
+
+    /**
+     * Copying all data from the 'meta' to temporary storage.
+     *
+     * @return Target temporary storage
+     */
+    private TmpStorage copyDataToTmpStorage() throws IgniteCheckedException {
+        TmpStorage tmpStorage = new 
TmpStorage(TEMPORARY_METASTORAGE_IN_MEMORY_SIZE, log);
+
+        GridCursor<MetastorageDataRow> cur = tree.find(null, null);
+
+        while (cur.next()) {
+            MetastorageDataRow row = cur.get();
+
+            tmpStorage.add(row.key(), row.value());
+        }
+
+        return tmpStorage;
+    }
+
+    /**
+     * Data recovery from temporary storage
+     *
+     * @param tmpStorage temporary storage.
+     */
+    private void restoreDataFromTmpStorage(TmpStorage tmpStorage) throws 
IgniteCheckedException {
+        for (Iterator<IgniteBiTuple<String, byte[]>> it = 
tmpStorage.stream().iterator(); it.hasNext(); ) {
+            IgniteBiTuple<String, byte[]> t = it.next();
+
+            putData(t.get1(), t.get2());
+        }
+
+        try {
+            tmpStorage.close();
+        }
+        catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * @param db Database.
+     */
+    private void initInternal(IgniteCacheDatabaseSharedManager db) throws 
IgniteCheckedException {
+        if (PRESERVE_LEGACY_METASTORAGE_PARTITION_ID)
+            getOrAllocateMetas(partId = 
PageIdAllocator.OLD_METASTORE_PARTITION);
+        else if (!readOnly || getOrAllocateMetas(partId = 
PageIdAllocator.OLD_METASTORE_PARTITION))
+            getOrAllocateMetas(partId = PageIdAllocator.METASTORE_PARTITION);
 
         if (!empty) {
             freeList = new FreeListImpl(METASTORAGE_CACHE_ID, "metastorage",
@@ -151,7 +252,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
             MetastorageRowStore rowStore = new MetastorageRowStore(freeList, 
db);
 
             tree = new MetastorageTree(METASTORAGE_CACHE_ID, 
dataRegion.pageMemory(), wal, rmvId,
-                freeList, rowStore, treeRoot.pageId().pageId(), 
treeRoot.isAllocated(), failureProcessor);
+                freeList, rowStore, treeRoot.pageId().pageId(), 
treeRoot.isAllocated(), failureProcessor, partId);
 
             if (!readOnly)
                 
((GridCacheDatabaseSharedManager)db).addCheckpointListener(this);
@@ -226,6 +327,23 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
         return res;
     }
 
+    /**
+     * Read all items from metastore.
+     */
+    public Collection<IgniteBiTuple<String, byte[]>> readAll() throws 
IgniteCheckedException {
+        ArrayList<IgniteBiTuple<String, byte[]>> res = new ArrayList<>();
+
+        GridCursor<MetastorageDataRow> cur = tree.find(null, null);
+
+        while (cur.next()) {
+            MetastorageDataRow row = cur.get();
+
+            res.add(new IgniteBiTuple<>(row.key(), row.value()));
+        }
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public void write(@NotNull String key, @NotNull Serializable 
val) throws IgniteCheckedException {
         assert val != null;
@@ -314,11 +432,16 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
                 + U.hexLong(reuseListRoot) + ", METASTORAGE_CACHE_ID=" + 
METASTORAGE_CACHE_ID);
     }
 
-    /** */
-    private void getOrAllocateMetas() throws IgniteCheckedException {
-        PageMemoryEx pageMem = (PageMemoryEx)dataRegion.pageMemory();
+    /**
+     * Initializing the selected partition for use as MetaStorage
+     *
+     * @param partId Partition id.
+     * @return true if the partion is empty
+     */
+    private boolean getOrAllocateMetas(int partId) throws 
IgniteCheckedException {
+        empty = false;
 
-        int partId = 0;
+        PageMemoryEx pageMem = (PageMemoryEx)dataRegion.pageMemory();
 
         long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, 
partId);
         long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, 
partMetaId);
@@ -330,7 +453,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
                     if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
                         empty = true;
 
-                        return;
+                        return true;
                     }
 
                     PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
@@ -406,6 +529,8 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
         finally {
             pageMem.releasePage(METASTORAGE_CACHE_ID, partMetaId, 
partMetaPage);
         }
+
+        return false;
     }
 
     /**
@@ -451,8 +576,6 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     private void saveStoreMetadata() throws IgniteCheckedException {
         PageMemoryEx pageMem = (PageMemoryEx) pageMemory();
 
-        int partId = 0;
-
         long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, 
partId);
         long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, 
partMetaId);
 
@@ -498,7 +621,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
     }
 
     /** */
-    public static class FreeListImpl extends 
AbstractFreeList<MetastorageDataRow> {
+    public class FreeListImpl extends AbstractFreeList<MetastorageDataRow> {
         /** {@inheritDoc} */
         FreeListImpl(int cacheId, String name, DataRegionMetricsImpl 
regionMetrics, DataRegion dataRegion,
             ReuseList reuseList,
@@ -513,7 +636,7 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
 
         /** {@inheritDoc} */
         @Override protected long allocatePageNoReuse() throws 
IgniteCheckedException {
-            return pageMem.allocatePage(grpId, 
PageIdAllocator.METASTORE_PARTITION, FLAG_DATA);
+            return pageMem.allocatePage(grpId, partId, FLAG_DATA);
         }
 
         /**
@@ -598,4 +721,259 @@ public class MetaStorage implements DbCheckpointListener, 
ReadOnlyMetastorage, R
             return new MetastorageDataRow(link, key, incomplete.data());
         }
     }
+
+    /**
+     * Temporary storage internal
+     */
+    private interface TmpStorageInternal extends Closeable {
+        /**
+         * Put data
+         *
+         * @param key Key.
+         * @param val Value.
+         */
+        boolean add(String key, byte[] val) throws IOException;
+
+        /**
+         * Read data from storage
+         */
+        Stream<IgniteBiTuple<String, byte[]>> stream() throws IOException;
+    }
+
+    /**
+     * Temporary storage (memory)
+     */
+    private static class MemoryTmpStorage implements TmpStorageInternal {
+        /** Buffer. */
+        final ByteBuffer buf;
+
+        /** Size. */
+        int size;
+
+        /**
+         * @param size Size.
+         */
+        MemoryTmpStorage(int size) {
+            buf = ByteBuffer.allocateDirect(size);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(String key, byte[] val) {
+            byte[] keyData = key.getBytes(StandardCharsets.UTF_8);
+
+            if (val.length + keyData.length + 8 > buf.remaining())
+                return false;
+
+            
buf.putInt(keyData.length).putInt(val.length).put(keyData).put(val);
+
+            size++;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Stream<IgniteBiTuple<String, byte[]>> stream() {
+            buf.flip();
+
+            return Stream.generate(() -> {
+                int keyLen = buf.getInt();
+                int dataLen = buf.getInt();
+
+                byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)];
+
+                buf.get(tmpBuf, 0, keyLen);
+
+                String key = new String(tmpBuf, 0, keyLen, 
StandardCharsets.UTF_8);
+
+                buf.get(tmpBuf, 0, dataLen);
+
+                return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? 
Arrays.copyOf(tmpBuf, dataLen) : tmpBuf);
+            }).limit(size);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+        }
+    }
+
+    /**
+     * Temporary storage (file)
+     */
+    private static class FileTmpStorage implements TmpStorageInternal {
+        /** Cache. */
+        final ByteBuffer cache = 
ByteBuffer.allocateDirect(TEMPORARY_METASTORAGE_BUFFER_SIZE);
+
+        /** File. */
+        RandomAccessFile file;
+
+        /** Size. */
+        long size;
+
+        /** {@inheritDoc} */
+        @Override public boolean add(String key, byte[] val) throws 
IOException {
+            if (file == null)
+                file = new RandomAccessFile(File.createTempFile("m_storage", 
"bin"), "rw");
+
+            byte[] keyData = key.getBytes(StandardCharsets.UTF_8);
+
+            if (val.length + keyData.length + 8 > cache.remaining())
+                flushCache(false);
+
+            
cache.putInt(keyData.length).putInt(val.length).put(keyData).put(val);
+
+            size++;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Stream<IgniteBiTuple<String, byte[]>> stream() throws 
IOException {
+            if (file == null)
+                return Stream.empty();
+
+            flushCache(true);
+
+            file.getChannel().position(0);
+
+            readToCache();
+
+            return Stream.generate(() -> {
+                if (cache.remaining() <= 8) {
+                    cache.compact();
+
+                    try {
+                        readToCache();
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                int keyLen = cache.getInt();
+                int dataLen = cache.getInt();
+
+                if (cache.remaining() < keyLen + dataLen) {
+                    cache.compact();
+
+                    try {
+                        readToCache();
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+
+                byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)];
+
+                cache.get(tmpBuf, 0, keyLen);
+
+                String key = new String(tmpBuf, 0, keyLen, 
StandardCharsets.UTF_8);
+
+                cache.get(tmpBuf, 0, dataLen);
+
+                return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? 
Arrays.copyOf(tmpBuf, dataLen) : tmpBuf);
+            }).limit(size);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            file.close();
+        }
+
+        /**
+         * Read data to cache
+         */
+        private void readToCache() throws IOException {
+            int len = (int)Math.min(file.length() - 
file.getChannel().position(), cache.remaining());
+
+            while (len > 0)
+                len -= file.getChannel().read(cache);
+
+            cache.flip();
+        }
+
+        /**
+         * Write cache to file.
+         *
+         * @param force force metadata.
+         */
+        private void flushCache(boolean force) throws IOException {
+            if (cache.position() > 0) {
+                cache.flip();
+
+                while (cache.remaining() > 0)
+                    file.getChannel().write(cache);
+
+                cache.clear();
+            }
+
+            file.getChannel().force(force);
+        }
+    }
+
+    /**
+     * Temporary storage
+     */
+    public static class TmpStorage implements Closeable {
+        /** Chain of internal storages. */
+        final List<TmpStorageInternal> chain = new ArrayList<>(2);
+
+        /** Current internal storage. */
+        TmpStorageInternal current;
+
+        /** Logger. */
+        final IgniteLogger log;
+
+        /**
+         * @param memBufSize Memory buffer size.
+         * @param log Logger.
+         */
+        TmpStorage(int memBufSize, IgniteLogger log) {
+            this.log = log;
+
+            chain.add(current = new MemoryTmpStorage(memBufSize));
+        }
+
+        /**
+         * Put data
+         *
+         * @param key Key.
+         * @param val Value.
+         */
+        public void add(String key, byte[] val) throws IgniteCheckedException {
+            try {
+                while (!current.add(key, val))
+                    chain.add(current = new FileTmpStorage());
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
+
+        /**
+         * Read data from storage
+         */
+        public Stream<IgniteBiTuple<String, byte[]>> stream() {
+            return chain.stream().flatMap(storage -> {
+                try {
+                    return storage.stream();
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            for (TmpStorageInternal storage : chain) {
+                try {
+                    storage.close();
+                }
+                catch (IOException ex) {
+                    log.error(ex.getMessage(), ex);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
index 5e2660b..2d7b0a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.processors.cache.persistence.Storable;
 
 /**
@@ -62,7 +63,7 @@ public class MetastorageDataRow implements 
MetastorageSearchRow, Storable {
     /** {@inheritDoc} */
     @Override
     public int partition() {
-        return 0;
+        return MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID ? 
PageIdAllocator.OLD_METASTORE_PARTITION: PageIdAllocator.METASTORE_PARTITION;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
index 420c51d..1db47be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -44,6 +43,9 @@ public class MetastorageTree extends 
BPlusTree<MetastorageSearchRow, Metastorage
     /** */
     private MetastorageRowStore rowStore;
 
+    /** Partition id. */
+    private final int partId;
+
     /**
      * @param pageMem Page memory instance.
      * @param wal WAL manager.
@@ -63,12 +65,15 @@ public class MetastorageTree extends 
BPlusTree<MetastorageSearchRow, Metastorage
         MetastorageRowStore rowStore,
         long metaPageId,
         boolean initNew,
-        @Nullable FailureProcessor failureProcessor) throws 
IgniteCheckedException {
+        @Nullable FailureProcessor failureProcessor,
+        int partId) throws IgniteCheckedException {
         super("Metastorage", cacheId, pageMem, wal,
             globalRmvId, metaPageId, reuseList, MetastorageInnerIO.VERSIONS, 
MetastoreLeafIO.VERSIONS, failureProcessor);
 
         this.rowStore = rowStore;
 
+        this.partId = partId;
+
         initTree(initNew);
     }
 
@@ -99,7 +104,7 @@ public class MetastorageTree extends 
BPlusTree<MetastorageSearchRow, Metastorage
 
     /** {@inheritDoc} */
     @Override protected long allocatePageNoReuse() throws 
IgniteCheckedException {
-        return pageMem.allocatePage(grpId, 
PageIdAllocator.METASTORE_PARTITION, FLAG_DATA);
+        return pageMem.allocatePage(grpId, partId, FLAG_DATA);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
index 39faf5a..dba8486 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -231,7 +232,7 @@ public class IgnitePdsCorruptedStoreTest extends 
GridCommonAbstractTest {
 
         MetaStorage metaStorage = 
ignite.context().cache().context().database().metaStorage();
 
-        corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), 
METASTORAGE_CACHE_ID, 0);
+        corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), 
METASTORAGE_CACHE_ID, PageIdAllocator.METASTORE_PARTITION);
 
         stopGrid(0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
index 538b332..ee609c1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java
@@ -18,7 +18,15 @@
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -28,6 +36,7 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -147,12 +156,274 @@ public class IgniteMetaStorageBasicTest extends 
GridCommonAbstractTest {
 
                 metaStorage.remove(key);
 
-                metaStorage.putData(key, arr/*b.toString().getBytes()*/);
+                metaStorage.putData(key, arr);
             }
         }
         finally {
             db.checkpointReadUnlock();
         }
+
+        stopGrid();
+    }
+
+    /**
+     * @param metaStorage Meta storage.
+     * @param size Size.
+     */
+    private Map<String, byte[]> putDataToMetaStorage(MetaStorage metaStorage, 
int size, int from) throws IgniteCheckedException {
+        Map<String, byte[]> res = new HashMap<>();
+
+        for (Iterator<IgniteBiTuple<String, byte[]>> it = 
generateTestData(size, from).iterator(); it.hasNext(); ) {
+            IgniteBiTuple<String, byte[]> d = it.next();
+
+            metaStorage.putData(d.getKey(), d.getValue());
+
+            res.put(d.getKey(), d.getValue());
+        }
+
+        return res;
+    }
+
+    /**
+     * Testing data migration between metastorage partitions (delete partition 
case)
+     */
+    public void testDeletePartitionFromMetaStorageMigration() throws Exception 
{
+        final Map<String, byte[]> testData = new HashMap<>();
+
+        MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = true;
+
+        try {
+            IgniteEx ig = startGrid(0);
+
+            ig.cluster().active(true);
+
+            IgniteCacheDatabaseSharedManager db = 
ig.context().cache().context().database();
+
+            MetaStorage metaStorage = db.metaStorage();
+
+            assertNotNull(metaStorage);
+
+            db.checkpointReadLock();
+
+            try {
+                testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 0));
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+
+            db.waitForCheckpoint("Test");
+
+            ((GridCacheDatabaseSharedManager)db).enableCheckpoints(false);
+
+            db.checkpointReadLock();
+
+            try {
+                testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 
1_000));
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+
+            stopGrid(0);
+
+            MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
+
+            IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(0));
+
+            cfg.getDataStorageConfiguration().setCheckpointFrequency(3600 * 
1000L);
+
+            ig = (IgniteEx)startGrid(getTestIgniteInstanceName(0), 
optimize(cfg), null);
+
+            ig.cluster().active(true);
+
+            db = ig.context().cache().context().database();
+
+            metaStorage = db.metaStorage();
+
+            assertNotNull(metaStorage);
+
+            db.checkpointReadLock();
+
+            try {
+                testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 
2_000));
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+
+            db.waitForCheckpoint("Test");
+
+            stopGrid(0);
+
+            ig = startGrid(0);
+
+            ig.cluster().active(true);
+
+            db = ig.context().cache().context().database();
+
+            metaStorage = db.metaStorage();
+
+            assertNotNull(metaStorage);
+
+            db.checkpointReadLock();
+            try {
+                Collection<IgniteBiTuple<String, byte[]>> read = 
metaStorage.readAll();
+
+                int cnt = 0;
+                for (IgniteBiTuple<String, byte[]> r : read) {
+                    byte[] test = testData.get(r.get1());
+
+                    if (test != null) {
+                        Assert.assertArrayEquals(r.get2(), test);
+
+                        cnt++;
+                    }
+                }
+
+                assertEquals(cnt, testData.size());
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+        finally {
+            MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
+        }
+
+    }
+
+    /**
+     * Testing data migration between metastorage partitions
+     */
+    public void testMetaStorageMigration() throws Exception {
+        final Map<String, byte[]> testData = new HashMap<>(5_000);
+
+        generateTestData(5_000, -1).forEach(t -> testData.put(t.get1(), 
t.get2()));
+
+        MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = true;
+
+        try {
+            IgniteEx ig = startGrid(0);
+
+            ig.cluster().active(true);
+
+            IgniteCacheDatabaseSharedManager db = 
ig.context().cache().context().database();
+
+            MetaStorage metaStorage = db.metaStorage();
+
+            assertNotNull(metaStorage);
+
+            db.checkpointReadLock();
+
+            try {
+                for (Map.Entry<String, byte[]> v : testData.entrySet())
+                    metaStorage.putData(v.getKey(), v.getValue());
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+
+            stopGrid(0);
+
+            MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
+
+            ig = startGrid(0);
+
+            ig.cluster().active(true);
+
+            db = ig.context().cache().context().database();
+
+            metaStorage = db.metaStorage();
+
+            assertNotNull(metaStorage);
+
+            db.checkpointReadLock();
+
+            try {
+                Collection<IgniteBiTuple<String, byte[]>> read = 
metaStorage.readAll();
+
+                int cnt = 0;
+                for (IgniteBiTuple<String, byte[]> r : read) {
+                    byte[] test = testData.get(r.get1());
+
+                    if (test != null) {
+                        Assert.assertArrayEquals(r.get2(), test);
+
+                        cnt++;
+                    }
+                }
+
+                assertEquals(cnt, testData.size());
+            }
+            finally {
+                db.checkpointReadUnlock();
+            }
+        }
+        finally {
+            MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false;
+        }
+    }
+
+    /**
+     * Testing temporary storage
+     */
+    public void testMetaStoreMigrationTmpStorage() throws Exception {
+        List<IgniteBiTuple<String, byte[]>> data = generateTestData(2_000, 
-1).collect(Collectors.toList());
+
+        // memory
+        try (MetaStorage.TmpStorage tmpStorage = new MetaStorage.TmpStorage(4 
* 1024 * 1024, log)) {
+            for (IgniteBiTuple<String, byte[]> item : data)
+                tmpStorage.add(item.get1(), item.get2());
+
+            compare(tmpStorage.stream().iterator(), data.iterator());
+        }
+
+        // file
+        try (MetaStorage.TmpStorage tmpStorage = new MetaStorage.TmpStorage(4 
* 1024, log)) {
+            for (IgniteBiTuple<String, byte[]> item : data)
+                tmpStorage.add(item.get1(), item.get2());
+
+            compare(tmpStorage.stream().iterator(), data.iterator());
+        }
+    }
+
+    /**
+     * Test data generation
+     */
+    private static Stream<IgniteBiTuple<String, byte[]>> generateTestData(int 
size, int fromKey) {
+        final AtomicInteger idx = new AtomicInteger(fromKey);
+        final Random rnd = new Random();
+
+        return Stream.generate(() -> {
+            byte[] val = new byte[1024];
+
+            rnd.nextBytes(val);
+
+            return new IgniteBiTuple<>("KEY_" + (fromKey < 0 ? rnd.nextInt() : 
idx.getAndIncrement()), val);
+        }).limit(size);
+    }
+
+    /**
+     * Compare two iterator
+     *
+     * @param it It.
+     * @param it1 It 1.
+     */
+    private static void compare(Iterator<IgniteBiTuple<String, byte[]>> it, 
Iterator<IgniteBiTuple<String, byte[]>> it1) {
+        while (true) {
+            Assert.assertEquals(it.hasNext(), it1.hasNext());
+
+            if (!it.hasNext())
+                break;
+
+            IgniteBiTuple<String, byte[]> i = it.next();
+            IgniteBiTuple<String, byte[]> i1 = it1.next();
+
+            Assert.assertEquals(i.get1(), i.get1());
+
+            Assert.assertArrayEquals(i.get2(), i1.get2());
+        }
     }
 
     /**

Reply via email to