IGNITE-10348 Safely recreate metastore to mitigate IGNITE-8735 Signed-off-by: Ivan Rakov <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86f92d96 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86f92d96 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86f92d96 Branch: refs/heads/ignite-10189 Commit: 86f92d9697e6bb1f138b5d2e477d7eb5cf8c8814 Parents: 137a0e7 Author: Alexey Stelmak <[email protected]> Authored: Fri Dec 7 18:56:48 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Fri Dec 7 18:56:48 2018 +0300 ---------------------------------------------------------------------- .../internal/pagemem/PageIdAllocator.java | 5 +- .../processors/cache/GridCacheProcessor.java | 18 + .../persistence/file/FilePageStoreManager.java | 8 +- .../persistence/metastorage/MetaStorage.java | 406 ++++++++++++++++++- .../metastorage/MetastorageDataRow.java | 3 +- .../metastorage/MetastorageTree.java | 11 +- .../IgnitePdsCorruptedStoreTest.java | 3 +- .../metastorage/IgniteMetaStorageBasicTest.java | 273 ++++++++++++- 8 files changed, 702 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java index b3e4b07..d91d31d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageIdAllocator.java @@ -35,8 +35,11 @@ public interface PageIdAllocator { /** Special partition reserved for index space. */ public static final int INDEX_PARTITION = 0xFFFF; + /** Old special partition reserved for metastore space. */ + public static final int OLD_METASTORE_PARTITION = 0x0; + /** Special partition reserved for metastore space. */ - public static final int METASTORE_PARTITION = 0x0; + public static final int METASTORE_PARTITION = 0x1; /** * Allocates a page from the space for the given partition ID and the given flags. http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 375dd12..3e32ead 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -105,6 +105,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; @@ -277,6 +278,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Cache recovery lifecycle state and actions. */ private final CacheRecoveryLifecycle recovery = new CacheRecoveryLifecycle(); + /** Tmp storage for meta migration. */ + private MetaStorage.TmpStorage tmpStorage; + /** * @param ctx Kernal context. */ @@ -5471,6 +5475,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Get Temporary storage + */ + public MetaStorage.TmpStorage getTmpStorage() { + return tmpStorage; + } + + /** + * Set Temporary storage + */ + public void setTmpStorage(MetaStorage.TmpStorage tmpStorage) { + this.tmpStorage = tmpStorage; + } + + /** * Recovery lifecycle for caches. */ private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener { http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index e80107f..5a4eb8b 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -374,10 +374,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen CacheStoreHolder holder = initDir( new File(storeWorkDir, META_STORAGE_NAME), - grpId, - 1, - dataRegion.memoryMetrics(), - false); + grpId, + PageIdAllocator.METASTORE_PARTITION + 1, + dataRegion.memoryMetrics(), + false); CacheStoreHolder old = idxCacheStores.put(grpId, holder); http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java index a61bf64..7ff8257 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java @@ -17,13 +17,24 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -32,10 +43,11 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IncompleteObject; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; @@ -44,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListe import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.RootPage; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; @@ -58,12 +71,14 @@ import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.NotNull; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.OLD_METASTORE_PARTITION; import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId; import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; @@ -77,9 +92,19 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R /** */ public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME); + /** This flag is used ONLY FOR TESTING the migration of a metastorage from Part 0 to Part 1. */ + public static boolean PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false; + /** Marker for removed entry. */ private static final byte[] TOMBSTONE = new byte[0]; + /** Temporary metastorage memory size. */ + private static final int TEMPORARY_METASTORAGE_IN_MEMORY_SIZE = 128 * 1024 * 1024; + + /** Temporary metastorage buffer size (file). */ + private static final int TEMPORARY_METASTORAGE_BUFFER_SIZE = 1024 * 1024; + + /** */ private final IgniteWriteAheadLogManager wal; @@ -99,7 +124,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R private DataRegionMetricsImpl regionMetrics; /** */ - private boolean readOnly; + private final boolean readOnly; /** */ private boolean empty; @@ -122,6 +147,12 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R /** */ private final FailureProcessor failureProcessor; + /** Partition id. */ + private int partId; + + /** Cctx. */ + private final GridCacheSharedContext cctx; + /** */ public MetaStorage( GridCacheSharedContext cctx, @@ -129,6 +160,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R DataRegionMetricsImpl regionMetrics, boolean readOnly ) { + this.cctx = cctx; wal = cctx.wal(); this.dataRegion = dataRegion; this.regionMetrics = regionMetrics; @@ -138,10 +170,79 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R } /** */ - public void init(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException { + public void init(GridCacheDatabaseSharedManager db) throws IgniteCheckedException { regionMetrics.clear(); + initInternal(db); + + if (!PRESERVE_LEGACY_METASTORAGE_PARTITION_ID) { + GridCacheProcessor gcProcessor = cctx.kernalContext().cache(); + + if (partId == OLD_METASTORE_PARTITION) + gcProcessor.setTmpStorage(copyDataToTmpStorage()); + else if (gcProcessor.getTmpStorage() != null) { + restoreDataFromTmpStorage(gcProcessor.getTmpStorage()); + + gcProcessor.setTmpStorage(null); + + // remove old partitions + CacheGroupContext cgc = cctx.cache().cacheGroup(METASTORAGE_CACHE_ID); - getOrAllocateMetas(); + if (cgc != null) { + db.schedulePartitionDestroy(METASTORAGE_CACHE_ID, OLD_METASTORE_PARTITION); + + db.schedulePartitionDestroy(METASTORAGE_CACHE_ID, PageIdAllocator.INDEX_PARTITION); + } + } + } + } + + /** + * Copying all data from the 'meta' to temporary storage. + * + * @return Target temporary storage + */ + private TmpStorage copyDataToTmpStorage() throws IgniteCheckedException { + TmpStorage tmpStorage = new TmpStorage(TEMPORARY_METASTORAGE_IN_MEMORY_SIZE, log); + + GridCursor<MetastorageDataRow> cur = tree.find(null, null); + + while (cur.next()) { + MetastorageDataRow row = cur.get(); + + tmpStorage.add(row.key(), row.value()); + } + + return tmpStorage; + } + + /** + * Data recovery from temporary storage + * + * @param tmpStorage temporary storage. + */ + private void restoreDataFromTmpStorage(TmpStorage tmpStorage) throws IgniteCheckedException { + for (Iterator<IgniteBiTuple<String, byte[]>> it = tmpStorage.stream().iterator(); it.hasNext(); ) { + IgniteBiTuple<String, byte[]> t = it.next(); + + putData(t.get1(), t.get2()); + } + + try { + tmpStorage.close(); + } + catch (IOException e) { + log.error(e.getMessage(), e); + } + } + + /** + * @param db Database. + */ + private void initInternal(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException { + if (PRESERVE_LEGACY_METASTORAGE_PARTITION_ID) + getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION); + else if (!readOnly || getOrAllocateMetas(partId = PageIdAllocator.OLD_METASTORE_PARTITION)) + getOrAllocateMetas(partId = PageIdAllocator.METASTORE_PARTITION); if (!empty) { freeList = new FreeListImpl(METASTORAGE_CACHE_ID, "metastorage", @@ -151,7 +252,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R MetastorageRowStore rowStore = new MetastorageRowStore(freeList, db); tree = new MetastorageTree(METASTORAGE_CACHE_ID, dataRegion.pageMemory(), wal, rmvId, - freeList, rowStore, treeRoot.pageId().pageId(), treeRoot.isAllocated(), failureProcessor); + freeList, rowStore, treeRoot.pageId().pageId(), treeRoot.isAllocated(), failureProcessor, partId); if (!readOnly) ((GridCacheDatabaseSharedManager)db).addCheckpointListener(this); @@ -226,6 +327,23 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R return res; } + /** + * Read all items from metastore. + */ + public Collection<IgniteBiTuple<String, byte[]>> readAll() throws IgniteCheckedException { + ArrayList<IgniteBiTuple<String, byte[]>> res = new ArrayList<>(); + + GridCursor<MetastorageDataRow> cur = tree.find(null, null); + + while (cur.next()) { + MetastorageDataRow row = cur.get(); + + res.add(new IgniteBiTuple<>(row.key(), row.value())); + } + + return res; + } + /** {@inheritDoc} */ @Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException { assert val != null; @@ -314,11 +432,16 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R + U.hexLong(reuseListRoot) + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID); } - /** */ - private void getOrAllocateMetas() throws IgniteCheckedException { - PageMemoryEx pageMem = (PageMemoryEx)dataRegion.pageMemory(); + /** + * Initializing the selected partition for use as MetaStorage + * + * @param partId Partition id. + * @return true if the partion is empty + */ + private boolean getOrAllocateMetas(int partId) throws IgniteCheckedException { + empty = false; - int partId = 0; + PageMemoryEx pageMem = (PageMemoryEx)dataRegion.pageMemory(); long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId); long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId); @@ -330,7 +453,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { empty = true; - return; + return true; } PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); @@ -406,6 +529,8 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R finally { pageMem.releasePage(METASTORAGE_CACHE_ID, partMetaId, partMetaPage); } + + return false; } /** @@ -451,8 +576,6 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R private void saveStoreMetadata() throws IgniteCheckedException { PageMemoryEx pageMem = (PageMemoryEx) pageMemory(); - int partId = 0; - long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId); long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId); @@ -498,7 +621,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R } /** */ - public static class FreeListImpl extends AbstractFreeList<MetastorageDataRow> { + public class FreeListImpl extends AbstractFreeList<MetastorageDataRow> { /** {@inheritDoc} */ FreeListImpl(int cacheId, String name, DataRegionMetricsImpl regionMetrics, DataRegion dataRegion, ReuseList reuseList, @@ -513,7 +636,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R /** {@inheritDoc} */ @Override protected long allocatePageNoReuse() throws IgniteCheckedException { - return pageMem.allocatePage(grpId, PageIdAllocator.METASTORE_PARTITION, FLAG_DATA); + return pageMem.allocatePage(grpId, partId, FLAG_DATA); } /** @@ -598,4 +721,259 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R return new MetastorageDataRow(link, key, incomplete.data()); } } + + /** + * Temporary storage internal + */ + private interface TmpStorageInternal extends Closeable { + /** + * Put data + * + * @param key Key. + * @param val Value. + */ + boolean add(String key, byte[] val) throws IOException; + + /** + * Read data from storage + */ + Stream<IgniteBiTuple<String, byte[]>> stream() throws IOException; + } + + /** + * Temporary storage (memory) + */ + private static class MemoryTmpStorage implements TmpStorageInternal { + /** Buffer. */ + final ByteBuffer buf; + + /** Size. */ + int size; + + /** + * @param size Size. + */ + MemoryTmpStorage(int size) { + buf = ByteBuffer.allocateDirect(size); + } + + /** {@inheritDoc} */ + @Override public boolean add(String key, byte[] val) { + byte[] keyData = key.getBytes(StandardCharsets.UTF_8); + + if (val.length + keyData.length + 8 > buf.remaining()) + return false; + + buf.putInt(keyData.length).putInt(val.length).put(keyData).put(val); + + size++; + + return true; + } + + /** {@inheritDoc} */ + @Override public Stream<IgniteBiTuple<String, byte[]>> stream() { + buf.flip(); + + return Stream.generate(() -> { + int keyLen = buf.getInt(); + int dataLen = buf.getInt(); + + byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)]; + + buf.get(tmpBuf, 0, keyLen); + + String key = new String(tmpBuf, 0, keyLen, StandardCharsets.UTF_8); + + buf.get(tmpBuf, 0, dataLen); + + return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? Arrays.copyOf(tmpBuf, dataLen) : tmpBuf); + }).limit(size); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + } + } + + /** + * Temporary storage (file) + */ + private static class FileTmpStorage implements TmpStorageInternal { + /** Cache. */ + final ByteBuffer cache = ByteBuffer.allocateDirect(TEMPORARY_METASTORAGE_BUFFER_SIZE); + + /** File. */ + RandomAccessFile file; + + /** Size. */ + long size; + + /** {@inheritDoc} */ + @Override public boolean add(String key, byte[] val) throws IOException { + if (file == null) + file = new RandomAccessFile(File.createTempFile("m_storage", "bin"), "rw"); + + byte[] keyData = key.getBytes(StandardCharsets.UTF_8); + + if (val.length + keyData.length + 8 > cache.remaining()) + flushCache(false); + + cache.putInt(keyData.length).putInt(val.length).put(keyData).put(val); + + size++; + + return true; + } + + /** {@inheritDoc} */ + @Override public Stream<IgniteBiTuple<String, byte[]>> stream() throws IOException { + if (file == null) + return Stream.empty(); + + flushCache(true); + + file.getChannel().position(0); + + readToCache(); + + return Stream.generate(() -> { + if (cache.remaining() <= 8) { + cache.compact(); + + try { + readToCache(); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + int keyLen = cache.getInt(); + int dataLen = cache.getInt(); + + if (cache.remaining() < keyLen + dataLen) { + cache.compact(); + + try { + readToCache(); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + byte[] tmpBuf = new byte[Math.max(keyLen, dataLen)]; + + cache.get(tmpBuf, 0, keyLen); + + String key = new String(tmpBuf, 0, keyLen, StandardCharsets.UTF_8); + + cache.get(tmpBuf, 0, dataLen); + + return new IgniteBiTuple<>(key, tmpBuf.length > dataLen ? Arrays.copyOf(tmpBuf, dataLen) : tmpBuf); + }).limit(size); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + file.close(); + } + + /** + * Read data to cache + */ + private void readToCache() throws IOException { + int len = (int)Math.min(file.length() - file.getChannel().position(), cache.remaining()); + + while (len > 0) + len -= file.getChannel().read(cache); + + cache.flip(); + } + + /** + * Write cache to file. + * + * @param force force metadata. + */ + private void flushCache(boolean force) throws IOException { + if (cache.position() > 0) { + cache.flip(); + + while (cache.remaining() > 0) + file.getChannel().write(cache); + + cache.clear(); + } + + file.getChannel().force(force); + } + } + + /** + * Temporary storage + */ + public static class TmpStorage implements Closeable { + /** Chain of internal storages. */ + final List<TmpStorageInternal> chain = new ArrayList<>(2); + + /** Current internal storage. */ + TmpStorageInternal current; + + /** Logger. */ + final IgniteLogger log; + + /** + * @param memBufSize Memory buffer size. + * @param log Logger. + */ + TmpStorage(int memBufSize, IgniteLogger log) { + this.log = log; + + chain.add(current = new MemoryTmpStorage(memBufSize)); + } + + /** + * Put data + * + * @param key Key. + * @param val Value. + */ + public void add(String key, byte[] val) throws IgniteCheckedException { + try { + while (!current.add(key, val)) + chain.add(current = new FileTmpStorage()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Read data from storage + */ + public Stream<IgniteBiTuple<String, byte[]>> stream() { + return chain.stream().flatMap(storage -> { + try { + return storage.stream(); + } + catch (IOException e) { + throw new IgniteException(e); + } + }); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + for (TmpStorageInternal storage : chain) { + try { + storage.close(); + } + catch (IOException ex) { + log.error(ex.getMessage(), ex); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java index 5e2660b..2d7b0a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.processors.cache.persistence.Storable; /** @@ -62,7 +63,7 @@ public class MetastorageDataRow implements MetastorageSearchRow, Storable { /** {@inheritDoc} */ @Override public int partition() { - return 0; + return MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID ? PageIdAllocator.OLD_METASTORE_PARTITION: PageIdAllocator.METASTORE_PARTITION; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java index 420c51d..1db47be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; @@ -44,6 +43,9 @@ public class MetastorageTree extends BPlusTree<MetastorageSearchRow, Metastorage /** */ private MetastorageRowStore rowStore; + /** Partition id. */ + private final int partId; + /** * @param pageMem Page memory instance. * @param wal WAL manager. @@ -63,12 +65,15 @@ public class MetastorageTree extends BPlusTree<MetastorageSearchRow, Metastorage MetastorageRowStore rowStore, long metaPageId, boolean initNew, - @Nullable FailureProcessor failureProcessor) throws IgniteCheckedException { + @Nullable FailureProcessor failureProcessor, + int partId) throws IgniteCheckedException { super("Metastorage", cacheId, pageMem, wal, globalRmvId, metaPageId, reuseList, MetastorageInnerIO.VERSIONS, MetastoreLeafIO.VERSIONS, failureProcessor); this.rowStore = rowStore; + this.partId = partId; + initTree(initNew); } @@ -99,7 +104,7 @@ public class MetastorageTree extends BPlusTree<MetastorageSearchRow, Metastorage /** {@inheritDoc} */ @Override protected long allocatePageNoReuse() throws IgniteCheckedException { - return pageMem.allocatePage(grpId, PageIdAllocator.METASTORE_PARTITION, FLAG_DATA); + return pageMem.allocatePage(grpId, partId, FLAG_DATA); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java index 39faf5a..dba8486 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCorruptedStoreTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; @@ -231,7 +232,7 @@ public class IgnitePdsCorruptedStoreTest extends GridCommonAbstractTest { MetaStorage metaStorage = ignite.context().cache().context().database().metaStorage(); - corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID, 0); + corruptTreeRoot(ignite, (PageMemoryEx)metaStorage.pageMemory(), METASTORAGE_CACHE_ID, PageIdAllocator.METASTORE_PARTITION); stopGrid(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/86f92d96/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java index 538b332..ee609c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/IgniteMetaStorageBasicTest.java @@ -18,7 +18,15 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage; import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataRegionConfiguration; @@ -28,6 +36,7 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -147,12 +156,274 @@ public class IgniteMetaStorageBasicTest extends GridCommonAbstractTest { metaStorage.remove(key); - metaStorage.putData(key, arr/*b.toString().getBytes()*/); + metaStorage.putData(key, arr); } } finally { db.checkpointReadUnlock(); } + + stopGrid(); + } + + /** + * @param metaStorage Meta storage. + * @param size Size. + */ + private Map<String, byte[]> putDataToMetaStorage(MetaStorage metaStorage, int size, int from) throws IgniteCheckedException { + Map<String, byte[]> res = new HashMap<>(); + + for (Iterator<IgniteBiTuple<String, byte[]>> it = generateTestData(size, from).iterator(); it.hasNext(); ) { + IgniteBiTuple<String, byte[]> d = it.next(); + + metaStorage.putData(d.getKey(), d.getValue()); + + res.put(d.getKey(), d.getValue()); + } + + return res; + } + + /** + * Testing data migration between metastorage partitions (delete partition case) + */ + public void testDeletePartitionFromMetaStorageMigration() throws Exception { + final Map<String, byte[]> testData = new HashMap<>(); + + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = true; + + try { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteCacheDatabaseSharedManager db = ig.context().cache().context().database(); + + MetaStorage metaStorage = db.metaStorage(); + + assertNotNull(metaStorage); + + db.checkpointReadLock(); + + try { + testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 0)); + } + finally { + db.checkpointReadUnlock(); + } + + db.waitForCheckpoint("Test"); + + ((GridCacheDatabaseSharedManager)db).enableCheckpoints(false); + + db.checkpointReadLock(); + + try { + testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 1_000)); + } + finally { + db.checkpointReadUnlock(); + } + + stopGrid(0); + + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false; + + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)); + + cfg.getDataStorageConfiguration().setCheckpointFrequency(3600 * 1000L); + + ig = (IgniteEx)startGrid(getTestIgniteInstanceName(0), optimize(cfg), null); + + ig.cluster().active(true); + + db = ig.context().cache().context().database(); + + metaStorage = db.metaStorage(); + + assertNotNull(metaStorage); + + db.checkpointReadLock(); + + try { + testData.putAll(putDataToMetaStorage(metaStorage, 1_000, 2_000)); + } + finally { + db.checkpointReadUnlock(); + } + + db.waitForCheckpoint("Test"); + + stopGrid(0); + + ig = startGrid(0); + + ig.cluster().active(true); + + db = ig.context().cache().context().database(); + + metaStorage = db.metaStorage(); + + assertNotNull(metaStorage); + + db.checkpointReadLock(); + try { + Collection<IgniteBiTuple<String, byte[]>> read = metaStorage.readAll(); + + int cnt = 0; + for (IgniteBiTuple<String, byte[]> r : read) { + byte[] test = testData.get(r.get1()); + + if (test != null) { + Assert.assertArrayEquals(r.get2(), test); + + cnt++; + } + } + + assertEquals(cnt, testData.size()); + } + finally { + db.checkpointReadUnlock(); + } + } + finally { + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false; + } + + } + + /** + * Testing data migration between metastorage partitions + */ + public void testMetaStorageMigration() throws Exception { + final Map<String, byte[]> testData = new HashMap<>(5_000); + + generateTestData(5_000, -1).forEach(t -> testData.put(t.get1(), t.get2())); + + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = true; + + try { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + IgniteCacheDatabaseSharedManager db = ig.context().cache().context().database(); + + MetaStorage metaStorage = db.metaStorage(); + + assertNotNull(metaStorage); + + db.checkpointReadLock(); + + try { + for (Map.Entry<String, byte[]> v : testData.entrySet()) + metaStorage.putData(v.getKey(), v.getValue()); + } + finally { + db.checkpointReadUnlock(); + } + + stopGrid(0); + + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false; + + ig = startGrid(0); + + ig.cluster().active(true); + + db = ig.context().cache().context().database(); + + metaStorage = db.metaStorage(); + + assertNotNull(metaStorage); + + db.checkpointReadLock(); + + try { + Collection<IgniteBiTuple<String, byte[]>> read = metaStorage.readAll(); + + int cnt = 0; + for (IgniteBiTuple<String, byte[]> r : read) { + byte[] test = testData.get(r.get1()); + + if (test != null) { + Assert.assertArrayEquals(r.get2(), test); + + cnt++; + } + } + + assertEquals(cnt, testData.size()); + } + finally { + db.checkpointReadUnlock(); + } + } + finally { + MetaStorage.PRESERVE_LEGACY_METASTORAGE_PARTITION_ID = false; + } + } + + /** + * Testing temporary storage + */ + public void testMetaStoreMigrationTmpStorage() throws Exception { + List<IgniteBiTuple<String, byte[]>> data = generateTestData(2_000, -1).collect(Collectors.toList()); + + // memory + try (MetaStorage.TmpStorage tmpStorage = new MetaStorage.TmpStorage(4 * 1024 * 1024, log)) { + for (IgniteBiTuple<String, byte[]> item : data) + tmpStorage.add(item.get1(), item.get2()); + + compare(tmpStorage.stream().iterator(), data.iterator()); + } + + // file + try (MetaStorage.TmpStorage tmpStorage = new MetaStorage.TmpStorage(4 * 1024, log)) { + for (IgniteBiTuple<String, byte[]> item : data) + tmpStorage.add(item.get1(), item.get2()); + + compare(tmpStorage.stream().iterator(), data.iterator()); + } + } + + /** + * Test data generation + */ + private static Stream<IgniteBiTuple<String, byte[]>> generateTestData(int size, int fromKey) { + final AtomicInteger idx = new AtomicInteger(fromKey); + final Random rnd = new Random(); + + return Stream.generate(() -> { + byte[] val = new byte[1024]; + + rnd.nextBytes(val); + + return new IgniteBiTuple<>("KEY_" + (fromKey < 0 ? rnd.nextInt() : idx.getAndIncrement()), val); + }).limit(size); + } + + /** + * Compare two iterator + * + * @param it It. + * @param it1 It 1. + */ + private static void compare(Iterator<IgniteBiTuple<String, byte[]>> it, Iterator<IgniteBiTuple<String, byte[]>> it1) { + while (true) { + Assert.assertEquals(it.hasNext(), it1.hasNext()); + + if (!it.hasNext()) + break; + + IgniteBiTuple<String, byte[]> i = it.next(); + IgniteBiTuple<String, byte[]> i1 = it1.next(); + + Assert.assertEquals(i.get1(), i.get1()); + + Assert.assertArrayEquals(i.get2(), i1.get2()); + } } /**
