Repository: ignite Updated Branches: refs/heads/master 01f60542e -> 89c775737 Updated Tags: refs/tags/1.0.0-RELEASE-TEST-RC1 [created] a37da05fb refs/tags/1.0.0-RELEASE-TEST-RC2 [created] 16ba1421d refs/tags/1.0.0-RELEASE-TEST-RC3 [created] 1b3c8184d refs/tags/1.0.0-RELEASE-TEST-RC4 [created] d9ab06450 refs/tags/1.0.1-RELEASE-TEST-RC5 [created] eee94fa28 refs/tags/1.0.1-RELEASE-TEST-RC6 [created] 4de9e8dc5 refs/tags/1.10.0.ea1 [created] 0dae7316d refs/tags/1.10.0.ea2 [created] ab4808545 refs/tags/1.10.0.ea3 [created] 62df82653 refs/tags/1.10.0.ea5 [created] 30460d2a5 refs/tags/1.10.1.ea4 [created] 8c83d167d refs/tags/1.5.1-QAVS1901 [created] 5c8283571 refs/tags/1.5.10 [created] 4bf9edd68 refs/tags/1.5.11 [created] 460f0078f refs/tags/1.5.12 [created] fb675772d refs/tags/1.5.14 [created] 1362553b4 refs/tags/1.5.15 [created] 78e671138 refs/tags/1.5.16 [created] b39f54f21 refs/tags/1.5.17 [created] d4de1cc0f refs/tags/1.5.18 [created] 67b8c1ae2 refs/tags/1.5.19 [created] 953e9db0d refs/tags/1.5.20 [created] fca3c78c7 refs/tags/1.5.21 [created] bd1916bfe refs/tags/1.5.22 [created] 597ea0cf5 refs/tags/1.5.23 [created] f2832c6ff refs/tags/1.5.24 [created] 692cdf3e4 refs/tags/1.5.25 [created] c1a354a7a refs/tags/1.5.26 [created] 0d4dbc416 refs/tags/1.5.27 [created] 847b27814 refs/tags/1.5.28 [created] 55dbc8a62 refs/tags/1.5.29 [created] a1980c6f6 refs/tags/1.5.30 [created] ae7e6cc95 refs/tags/1.5.31 [created] 767f88bcf refs/tags/1.5.32 [created] f57631d98 refs/tags/1.5.33 [created] 1986b93b2 refs/tags/1.5.4 [created] db62c7a78 refs/tags/1.5.5 [created] 8f3ae6bab refs/tags/1.5.5-QATEST [created] 6b4e4be3a refs/tags/1.5.6 [created] 75591a935 refs/tags/1.5.7 [created] c9020c405 refs/tags/1.5.7-QATEST [created] 80b21ebb8 refs/tags/1.5.7-TEST [created] 57c19c20a refs/tags/1.5.8 [created] 210366eb7 refs/tags/1.5.9 [created] cc595929c refs/tags/1.6.1 [created] b418cea04 refs/tags/1.6.10 [created] f1c424bd2 refs/tags/1.6.11 [created] babff41f0 refs/tags/1.6.12 [created] a22010d32 refs/tags/1.6.2 [created] 072e3b3ce refs/tags/1.6.3 [created] 9d0212018 refs/tags/1.6.5 [created] 62c101cf3 refs/tags/1.6.6 [created] f14c6fb25 refs/tags/1.6.7 [created] 8e5ecdde0 refs/tags/1.6.8 [created] 4ef6b1743 refs/tags/1.6.8-QAVS1902 [created] 5739b6a1c refs/tags/1.6.9 [created] 93723542e refs/tags/1.7.1 [created] 3dd286282 refs/tags/1.7.2 [created] 9e67197e3 refs/tags/1.7.3 [created] 596479c91 refs/tags/1.7.4 [created] 3fae2e313 refs/tags/1.7.4-p1 [created] 0da3c2ed0 refs/tags/1.7.5 [created] ba3cccc88 refs/tags/1.8.0-b1 [created] 947266517 refs/tags/1.8.0.b2 [created] 5b0cedfe4 refs/tags/1.8.1 [created] 8fe1fc191 refs/tags/1.8.2 [created] f255ff094 refs/tags/2.2.4-test [created] e8e0d75d7 refs/tags/2.2.5-test [created] 36cb161e9
http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java new file mode 100644 index 0000000..6fa039d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/migration/UpgradePendingTreeToPerPartitionTask.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.migration; + +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IndexStorage; +import org.apache.ignite.internal.processors.cache.persistence.RootPage; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; +import org.apache.ignite.internal.processors.cache.tree.PendingRow; +import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree.WITHOUT_KEY; + +/** + * Ignite native persistence migration task upgrades existed PendingTrees to per-partition basis. It's ignore possible + * assertions errors when a pointer to an entry exists in tree but the entry itself was removed due to some reason (e.g. + * when partition was evicted after restart). + * + * Task goes through persistent cache groups and copy entries to certain partitions. + */ +public class UpgradePendingTreeToPerPartitionTask implements IgniteCallable<Boolean> { + /** */ + private static final String PENDING_ENTRIES_TREE_NAME = "PendingEntries"; + + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final int BATCH_SIZE = 500; + + /** */ + @IgniteInstanceResource + private IgniteEx node; + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public Boolean call() throws IgniteException { + GridCacheSharedContext<Object, Object> sharedCtx = node.context().cache().context(); + + for (CacheGroupContext grp : sharedCtx.cache().cacheGroups()) { + if (!grp.persistenceEnabled() || !grp.affinityNode()) { + if (!grp.persistenceEnabled()) + log.info("Skip pending tree upgrade for non-persistent cache group: [grpId=" + grp.groupId() + + ", grpName=" + grp.name() + ']'); + else + log.info("Skip pending tree upgrade on non-affinity node for cache group: [grpId=" + grp.groupId() + + ", grpName=" + grp.name() + ']'); + + continue; + } + + try { + processCacheGroup(grp); + } + catch (Exception ex) { + if (Thread.interrupted() || X.hasCause(ex, InterruptedException.class)) + log.info("Upgrade pending tree has been cancelled."); + else + log.warning("Failed to upgrade pending tree for cache group: [grpId=" + grp.groupId() + + ", grpName=" + grp.name() + ']', ex); + + return false; + } + + if (Thread.interrupted()) { + log.info("Upgrade pending tree has been cancelled."); + + return false; + } + } + + log.info("All pending trees upgraded successfully."); + + return true; + } + + /** + * Converts CacheGroup pending tree to per-partition basis. + * + * @param grp Cache group. + * @throws IgniteCheckedException If error occurs. + */ + private void processCacheGroup(CacheGroupContext grp) throws IgniteCheckedException { + assert grp.offheap() instanceof GridCacheOffheapManager; + + PendingEntriesTree oldPendingTree; + + final IgniteCacheDatabaseSharedManager db = grp.shared().database(); + + db.checkpointReadLock(); + try { + IndexStorage indexStorage = ((GridCacheOffheapManager)grp.offheap()).getIndexStorage(); + + //TODO: IGNITE-5874: replace with some check-method to avoid unnecessary page allocation. + RootPage pendingRootPage = indexStorage.getOrAllocateForTree(PENDING_ENTRIES_TREE_NAME); + + if (pendingRootPage.isAllocated()) { + log.info("No pending tree found for cache group: [grpId=" + grp.groupId() + + ", grpName=" + grp.name() + ']'); + + // Nothing to do here as just allocated tree is obviously empty. + indexStorage.dropRootPage(PENDING_ENTRIES_TREE_NAME); + + return; + } + + oldPendingTree = new PendingEntriesTree( + grp, + PENDING_ENTRIES_TREE_NAME, + grp.dataRegion().pageMemory(), + pendingRootPage.pageId().pageId(), + ((GridCacheOffheapManager)grp.offheap()).reuseListForIndex(null), + false + ); + } + finally { + db.checkpointReadUnlock(); + } + + processPendingTree(grp, oldPendingTree); + + if (Thread.currentThread().isInterrupted()) + return; + + db.checkpointReadLock(); + try { + oldPendingTree.destroy(); + } + finally { + db.checkpointReadUnlock(); + } + } + + /** + * Move pending rows for CacheGroup entries to per-partition PendingTree. + * Invalid pending rows will be ignored. + * + * @param grp Cache group. + * @param oldPendingEntries Old-style PendingTree. + * @throws IgniteCheckedException If error occurs. + */ + private void processPendingTree(CacheGroupContext grp, PendingEntriesTree oldPendingEntries) + throws IgniteCheckedException { + final PageMemory pageMemory = grp.dataRegion().pageMemory(); + + final IgniteCacheDatabaseSharedManager db = grp.shared().database(); + + final Set<Integer> cacheIds = grp.cacheIds(); + + PendingRow row = null; + + int processedEntriesCnt = 0; + int skippedEntries = 0; + + // Re-acquire checkpoint lock for every next batch. + while (!Thread.currentThread().isInterrupted()) { + int cnt = 0; + + db.checkpointReadLock(); + try { + GridCursor<PendingRow> cursor = oldPendingEntries.find(row, null, WITHOUT_KEY); + + while (cnt++ < BATCH_SIZE && cursor.next()) { + row = cursor.get(); + + assert row.link != 0 && row.expireTime != 0 : row; + + GridCacheEntryEx entry; + + // Lost cache or lost entry. + if (!cacheIds.contains(row.cacheId) || (entry = getEntry(grp, row)) == null) { + skippedEntries++; + + oldPendingEntries.removex(row); + + continue; + } + + entry.lockEntry(); + try { + if (processRow(pageMemory, grp, row)) + processedEntriesCnt++; + else + skippedEntries++; + } + finally { + entry.unlockEntry(); + } + + oldPendingEntries.removex(row); + } + + if (cnt < BATCH_SIZE) + break; + } + finally { + db.checkpointReadUnlock(); + } + } + + log.info("PendingTree upgraded: " + + "[grpId=" + grp.groupId() + + ", grpName=" + grp.name() + + ", processedEntries=" + processedEntriesCnt + + ", failedEntries=" + skippedEntries + + ']'); + } + + /** + * Return CacheEntry instance for lock purpose. + * + * @param grp Cache group + * @param row Pending row. + * @return CacheEntry if found or null otherwise. + */ + private GridCacheEntryEx getEntry(CacheGroupContext grp, PendingRow row) { + try { + CacheDataRowAdapter rowData = new CacheDataRowAdapter(row.link); + + rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY); + + GridCacheContext cctx = grp.shared().cacheContext(row.cacheId); + + assert cctx != null; + + return cctx.cache().entryEx(rowData.key()); + } + catch (Throwable ex) { + if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class)) + throw new IgniteException(new InterruptedException()); + + log.warning("Failed to move old-version pending entry " + + "to per-partition PendingTree: key not found (skipping): " + + "[grpId=" + grp.groupId() + + ", grpName=" + grp.name() + + ", pendingRow=" + row + "]"); + + return null; + } + + } + + /** + * Validates PendingRow and add it to per-partition PendingTree. + * + * @param pageMemory Page memory. + * @param grp Cache group. + * @param row Pending row. + * @return {@code True} if pending row successfully moved, {@code False} otherwise. + */ + private boolean processRow(PageMemory pageMemory, CacheGroupContext grp, PendingRow row) { + final long pageId = PageIdUtils.pageId(row.link); + + final int partition = PageIdUtils.partId(pageId); + + assert partition >= 0; + + try { + final long page = pageMemory.acquirePage(grp.groupId(), pageId); + long pageAddr = pageMemory.readLock(grp.groupId(), pageId, page); + try { + assert PageIO.getType(pageAddr) != 0; + assert PageIO.getVersion(pageAddr) != 0; + + IgniteCacheOffheapManager.CacheDataStore store = + ((GridCacheOffheapManager)grp.offheap()).dataStore(partition); + + if (store == null) { + log.warning("Failed to move old-version pending entry " + + "to per-partition PendingTree: Node has no partition anymore (skipping): " + + "[grpId=" + grp.groupId() + + ", grpName=" + grp.name() + + ", partId=" + partition + + ", pendingRow=" + row + "]"); + + return false; + } + + assert store instanceof GridCacheOffheapManager.GridCacheDataStore; + assert store.pendingTree() != null; + + store.pendingTree().invoke(row, WITHOUT_KEY, new PutIfAbsentClosure(row)); + } + finally { + pageMemory.readUnlock(grp.groupId(), pageId, page); + } + } + catch (AssertionError | Exception ex) { + if (Thread.currentThread().isInterrupted() || X.hasCause(ex, InterruptedException.class)) { + Thread.currentThread().interrupt(); + + throw new IgniteException(ex); + } + + String msg = "Unexpected error occurs while moving old-version pending entry " + + "to per-partition PendingTree. Seems page doesn't longer exists (skipping): " + + "[grpId=" + grp.groupId() + + ", grpName=" + grp.name() + + ", partId=" + partition + + ", pendingRow=" + row + ']'; + + if (log.isDebugEnabled()) + log.warning(msg, ex); + else + log.warning(msg); + + return false; + } + + return true; + } + + /** */ + private static class PutIfAbsentClosure implements IgniteTree.InvokeClosure<PendingRow> { + /** */ + private final PendingRow pendingRow; + + /** */ + private IgniteTree.OperationType op; + + /** */ + PutIfAbsentClosure(PendingRow pendingRow) { + this.pendingRow = pendingRow; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable PendingRow oldRow) throws IgniteCheckedException { + op = (oldRow == null) ? IgniteTree.OperationType.PUT : IgniteTree.OperationType.NOOP; + } + + /** {@inheritDoc} */ + @Override public PendingRow newRow() { + return pendingRow; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return op; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java index a5236c2..c940c39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java @@ -191,7 +191,6 @@ public abstract class PageIO { /** */ public static final short T_DATA_REF_METASTORAGE_LEAF = 23; - /** Index for payload == 1. */ public static final short T_H2_EX_REF_LEAF_START = 10000; @@ -215,8 +214,8 @@ public abstract class PageIO { * @param ver Page format version. */ protected PageIO(int type, int ver) { - assert ver > 0 && ver < 65535: ver; - assert type > 0 && type < 65535: type; + assert ver > 0 && ver < 65535 : ver; + assert type > 0 && type < 65535 : type; this.type = type; this.ver = ver; @@ -245,7 +244,7 @@ public abstract class PageIO { public static void setType(long pageAddr, int type) { PageUtils.putShort(pageAddr, TYPE_OFF, (short)type); - assert getType(pageAddr) == type; + assert getType(pageAddr) == type : getType(pageAddr); } /** @@ -268,7 +267,7 @@ public abstract class PageIO { * @param pageAddr Page address. * @param ver Version. */ - private static void setVersion(long pageAddr, int ver) { + protected static void setVersion(long pageAddr, int ver) { PageUtils.putShort(pageAddr, VER_OFF, (short)ver); assert getVersion(pageAddr) == ver; @@ -580,7 +579,7 @@ public abstract class PageIO { * @param pageSize Page size. * @param sb Sb. */ - protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException ; + protected abstract void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException; /** * @param addr Address. http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java index 3d79884..fe6b7a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java @@ -42,9 +42,13 @@ public class PagePartitionMetaIO extends PageMetaIO { /** */ private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1; + /** End of page partition meta. */ + static final int END_OF_PARTITION_PAGE_META = NEXT_PART_META_PAGE_OFF + 8; + /** */ public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>( - new PagePartitionMetaIO(1) + new PagePartitionMetaIO(1), + new PagePartitionMetaIOV2(2) ); /** {@inheritDoc} */ @@ -150,6 +154,7 @@ public class PagePartitionMetaIO extends PageMetaIO { /** * Returns partition counters page identifier, page with caches in cache group sizes. + * * @param pageAddr Partition metadata page address. * @return Next meta partial page ID or {@code 0} if it does not exist. */ @@ -167,19 +172,39 @@ public class PagePartitionMetaIO extends PageMetaIO { PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, cntrsPageId); } + /** + * Returns partition pending tree root. Pending tree is used to tracking expiring entries. + * + * @param pageAddr Page address. + * @return Pending Tree root page. + */ + public long getPendingTreeRoot(long pageAddr) { + throw new UnsupportedOperationException("Per partition pending tree is not supported by " + + "this PagePartitionMetaIO version: ver=" + getVersion()); + } + + /** + * Sets new partition pending tree root. + * + * @param pageAddr Page address. + * @param treeRoot Pending Tree root + */ + public void setPendingTreeRoot(long pageAddr, long treeRoot) { + throw new UnsupportedOperationException("Per partition pending tree is not supported by " + + "this PagePartitionMetaIO version: ver=" + getVersion()); + } + /** {@inheritDoc} */ @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { super.printPage(pageAddr, pageSize, sb); byte state = getPartitionState(pageAddr); - sb - .a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr)) + sb.a(",\nPagePartitionMeta[\n\tsize=").a(getSize(pageAddr)) .a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr)) .a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr)) .a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")") .a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr)) - .a("\n]") - ; + .a("\n]"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java new file mode 100644 index 0000000..70556a1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIOV2.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.persistence.tree.io; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.util.GridStringBuilder; + +/** + * IO for partition metadata pages. + * Persistent partition contains it's own PendingTree. + */ +public class PagePartitionMetaIOV2 extends PagePartitionMetaIO { + /** */ + private static final int PENDING_TREE_ROOT_OFF = PagePartitionMetaIO.END_OF_PARTITION_PAGE_META; + + /** + * @param ver Version. + */ + public PagePartitionMetaIOV2(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public void initNewPage(long pageAddr, long pageId, int pageSize) { + super.initNewPage(pageAddr, pageId, pageSize); + + setPendingTreeRoot(pageAddr, 0L); + } + + /** {@inheritDoc} */ + @Override public long getPendingTreeRoot(long pageAddr) { + return PageUtils.getLong(pageAddr, PENDING_TREE_ROOT_OFF); + } + + /** {@inheritDoc} */ + @Override public void setPendingTreeRoot(long pageAddr, long treeRoot) { + PageUtils.putLong(pageAddr, PENDING_TREE_ROOT_OFF, treeRoot); + } + + /** {@inheritDoc} */ + @Override protected void printPage(long pageAddr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { + byte state = getPartitionState(pageAddr); + + sb.a("PagePartitionMeta[\n\ttreeRoot=").a(getReuseListRoot(pageAddr)); + sb.a(",\n\tpendingTreeRoot=").a(getLastSuccessfulFullSnapshotId(pageAddr)); + sb.a(",\n\tlastSuccessfulFullSnapshotId=").a(getLastSuccessfulFullSnapshotId(pageAddr)); + sb.a(",\n\tlastSuccessfulSnapshotId=").a(getLastSuccessfulSnapshotId(pageAddr)); + sb.a(",\n\tnextSnapshotTag=").a(getNextSnapshotTag(pageAddr)); + sb.a(",\n\tlastSuccessfulSnapshotTag=").a(getLastSuccessfulSnapshotTag(pageAddr)); + sb.a(",\n\tlastAllocatedPageCount=").a(getLastAllocatedPageCount(pageAddr)); + sb.a(",\n\tcandidatePageCount=").a(getCandidatePageCount(pageAddr)); + sb.a(",\n\tsize=").a(getSize(pageAddr)); + sb.a(",\n\tupdateCounter=").a(getUpdateCounter(pageAddr)); + sb.a(",\n\tglobalRemoveId=").a(getGlobalRemoveId(pageAddr)); + sb.a(",\n\tpartitionState=").a(state).a("(").a(GridDhtPartitionState.fromOrdinal(state)).a(")"); + sb.a(",\n\tcountersPageId=").a(getCountersPageId(pageAddr)); + sb.a("\n]"); + } + + /** + * Upgrade page to PagePartitionMetaIOV2 + * + * @param pageAddr Page address. + */ + public void upgradePage(long pageAddr) { + assert PageIO.getType(pageAddr) == getType(); + assert PageIO.getVersion(pageAddr) < 2; + + PageIO.setVersion(pageAddr, getVersion()); + setPendingTreeRoot(pageAddr, 0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java index 43a2303..ebe6f29 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java @@ -55,10 +55,11 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac Mockito.when(topologyMock.partitions()).thenReturn(3); List<GridDhtLocalPartition> localPartitions = Lists.newArrayList( - partitionMock(0, 1, 1), - partitionMock(1, 2, 2), - partitionMock(2, 3, 3) + partitionMock(0, 1, 1), + partitionMock(1, 2, 2), + partitionMock(2, 3, 3) ); + Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions); Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions); } @@ -82,10 +83,13 @@ public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstrac */ private GridDhtPartitionsSingleMessage from(@Nullable Map<Integer, T2<Long, Long>> countersMap, @Nullable Map<Integer, Long> sizesMap) { GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); + if (countersMap != null) msg.addPartitionUpdateCounters(0, countersMap); + if (sizesMap != null) msg.addPartitionSizes(0, sizesMap); + return msg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java index 99614ed..a02ed11 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java @@ -17,32 +17,35 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.io.Serializable; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** - * + * Cause by https://issues.apache.org/jira/browse/IGNITE-7278 */ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { /** */ @@ -52,7 +55,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { private static final int ENTRIES_COUNT = 10_000; /** */ - public static final String CACHE_NAME = "cache1"; + protected static final String CACHE_NAME = "cache1"; /** Checkpoint delay. */ private volatile int checkpointDelay = -1; @@ -79,21 +82,23 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setMaxSize(400 * 1024 * 1024).setPersistenceEnabled(true)) + new DataRegionConfiguration() + .setMaxSize(400 * 1024 * 1024) + .setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY) .setCheckpointFrequency(checkpointDelay); cfg.setDataStorageConfiguration(memCfg); - CacheConfiguration ccfg1 = new CacheConfiguration(); + CacheConfiguration ccfg = new CacheConfiguration(); - ccfg1.setName(CACHE_NAME); - ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg1.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg1.setAffinity(new RendezvousAffinityFunction(false, 128)); - ccfg1.setBackups(2); + ccfg.setName(CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); + ccfg.setBackups(2); - cfg.setCacheConfiguration(ccfg1); + cfg.setCacheConfiguration(ccfg); return cfg; } @@ -197,7 +202,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { } /** - * * @throws Exception if failed. */ public void testRebalncingDuringLoad_10_10_1_1() throws Exception { @@ -205,7 +209,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { } /** - * * @throws Exception if failed. */ public void testRebalncingDuringLoad_10_500_8_16() throws Exception { @@ -227,7 +230,7 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { final Ignite load = ignite(0); - load.active(true); + load.cluster().active(true); try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) { s.allowOverwrite(true); @@ -245,10 +248,13 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { Random rnd = ThreadLocalRandom.current(); while (!done.get()) { - Map<Integer, Integer> map = new TreeMap<>(); + Map<Integer, Person> map = new TreeMap<>(); - for (int i = 0; i < batch; i++) - map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt()); + for (int i = 0; i < batch; i++) { + int key = rnd.nextInt(ENTRIES_COUNT); + + map.put(key, new Person("fn" + key, "ln" + key)); + } cache.putAll(map); } @@ -277,4 +283,51 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { busyFut.get(); } + + /** + * + */ + static class Person implements Serializable { + /** */ + @GridToStringInclude + @QuerySqlField(index = true, groups = "full_name") + private String fName; + + /** */ + @GridToStringInclude + @QuerySqlField(index = true, groups = "full_name") + private String lName; + + /** + * @param fName First name. + * @param lName Last name. + */ + public Person(String fName, String lName) { + this.fName = fName; + this.lName = lName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + IgnitePersistentStoreCacheGroupsTest.Person person = (IgnitePersistentStoreCacheGroupsTest.Person)o; + + return Objects.equals(fName, person.fName) && Objects.equals(lName, person.lName); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(fName, lName); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java deleted file mode 100644 index 66b2047..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest2.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence; - -import java.util.Map; -import java.util.Random; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Cause by https://issues.apache.org/jira/browse/IGNITE-7278 - */ -public class IgnitePdsContinuousRestartTest2 extends GridCommonAbstractTest { - /** */ - private static final int GRID_CNT = 4; - - /** */ - private static final int ENTRIES_COUNT = 10_000; - - /** */ - public static final String CACHE_NAME = "cache1"; - - /** Checkpoint delay. */ - private volatile int checkpointDelay = -1; - - /** */ - private boolean cancel; - - /** - * Default constructor. - */ - public IgnitePdsContinuousRestartTest2() { - - } - - /** - * @param cancel Cancel. - */ - public IgnitePdsContinuousRestartTest2(boolean cancel) { - this.cancel = cancel; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - DataStorageConfiguration memCfg = new DataStorageConfiguration() - .setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setMaxSize(400 * 1024 * 1024) - .setPersistenceEnabled(true)) - .setWalMode(WALMode.LOG_ONLY) - .setCheckpointFrequency(checkpointDelay); - - cfg.setDataStorageConfiguration(memCfg); - - CacheConfiguration ccfg = new CacheConfiguration(); - - ccfg.setName(CACHE_NAME); - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); - ccfg.setBackups(2); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - cleanPersistenceDir(); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_500_1_1() throws Exception { - checkRebalancingDuringLoad(1000, 500, 1, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_500_1_1() throws Exception { - checkRebalancingDuringLoad(8000, 500, 1, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_20000_1_1() throws Exception { - checkRebalancingDuringLoad(1000, 20000, 1, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_8000_1_1() throws Exception { - checkRebalancingDuringLoad(8000, 8000, 1, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_500_8_1() throws Exception { - checkRebalancingDuringLoad(1000, 500, 8, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_500_8_1() throws Exception { - checkRebalancingDuringLoad(8000, 500, 8, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_20000_8_1() throws Exception { - checkRebalancingDuringLoad(1000, 20000, 8, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_8000_8_1() throws Exception { - checkRebalancingDuringLoad(8000, 8000, 8, 1); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_500_8_16() throws Exception { - checkRebalancingDuringLoad(1000, 500, 8, 16); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_500_8_16() throws Exception { - checkRebalancingDuringLoad(8000, 500, 8, 16); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_1000_20000_8_16() throws Exception { - checkRebalancingDuringLoad(1000, 20000, 8, 16); - } - - /** - * @throws Exception if failed. - */ - public void testRebalancingDuringLoad_8000_8000_8_16() throws Exception { - checkRebalancingDuringLoad(8000, 8000, 8, 16); - } - - /** - * - * @throws Exception if failed. - */ - public void testRebalncingDuringLoad_10_10_1_1() throws Exception { - checkRebalancingDuringLoad(10, 10, 1, 1); - } - - /** - * - * @throws Exception if failed. - */ - public void testRebalncingDuringLoad_10_500_8_16() throws Exception { - checkRebalancingDuringLoad(10, 500, 8, 16); - } - - /** - * @throws Exception if failed. - */ - private void checkRebalancingDuringLoad( - int restartDelay, - int checkpointDelay, - int threads, - final int batch - ) throws Exception { - this.checkpointDelay = checkpointDelay; - - startGrids(GRID_CNT); - - final Ignite load = ignite(0); - - load.cluster().active(true); - - try (IgniteDataStreamer<Object, Object> s = load.dataStreamer(CACHE_NAME)) { - s.allowOverwrite(true); - - for (int i = 0; i < ENTRIES_COUNT; i++) - s.addData(i, i); - } - - final AtomicBoolean done = new AtomicBoolean(false); - - IgniteInternalFuture<?> busyFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - IgniteCache<Object, Object> cache = load.cache(CACHE_NAME); - Random rnd = ThreadLocalRandom.current(); - - while (!done.get()) { - Map<Integer, Integer> map = new TreeMap<>(); - - for (int i = 0; i < batch; i++) - map.put(rnd.nextInt(ENTRIES_COUNT), rnd.nextInt()); - - cache.putAll(map); - } - - return null; - } - }, threads, "updater"); - - long end = System.currentTimeMillis() + 90_000; - - Random rnd = ThreadLocalRandom.current(); - - while (System.currentTimeMillis() < end) { - int idx = rnd.nextInt(GRID_CNT - 1) + 1; - - stopGrid(idx, cancel); - - U.sleep(restartDelay); - - startGrid(idx); - - U.sleep(restartDelay); - } - - done.set(true); - - busyFut.get(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java new file mode 100644 index 0000000..d5b3f55 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTestWithExpiryPolicy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import java.util.concurrent.TimeUnit; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +/** + * Cause by https://issues.apache.org/jira/browse/IGNITE-5879 + */ +public class IgnitePdsContinuousRestartTestWithExpiryPolicy extends IgnitePdsContinuousRestartTest { + /** Ip finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Default constructor. + */ + public IgnitePdsContinuousRestartTestWithExpiryPolicy() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + discoverySpi.setIpFinder(ipFinder); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setGroupName("Group1"); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); + ccfg.setBackups(2); + ccfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1))); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java index 1825666..03dc445 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/IgniteBaselineAbstractFullApiSelfTest.java @@ -33,7 +33,7 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() - .setMaxSize(200 * 1024 * 1024) + .setMaxSize(256 * 1024 * 1024) .setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY)); @@ -41,6 +41,13 @@ public abstract class IgniteBaselineAbstractFullApiSelfTest extends GridCacheAbs } /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ @Override protected int gridCount() { return 4; } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java new file mode 100644 index 0000000..be09e70 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import com.google.common.base.Strings; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import javax.cache.expiry.AccessedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test TTL worker with persistence enabled + */ +public class IgnitePdsWithTtlTest extends GridCommonAbstractTest { + /** */ + public static final String CACHE = "expirableCache"; + + /** */ + private static final int EXPIRATION_TIMEOUT = 10; + + /** */ + public static final int ENTRIES = 7000; + + /** */ + private static final TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + //protection if test failed to finish, e.g. by error + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(FINDER); + + cfg.setDiscoverySpi(disco); + + final CacheConfiguration ccfg = new CacheConfiguration(); + ccfg.setName(CACHE); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); + ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, EXPIRATION_TIMEOUT))); + ccfg.setEagerTtl(true); + ccfg.setGroupName("group1"); + + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(256L * 1024 * 1024) + .setPersistenceEnabled(true) + ).setWalMode(WALMode.DEFAULT)); + + cfg.setCacheConfiguration(ccfg); + return cfg; + } + + /** + * @throws Exception if failed. + */ + public void testTtlIsApplied() throws Exception { + loadAndWaitForCleanup(false); + } + + /** + * @throws Exception if failed. + */ + public void testTtlIsAppliedAfterRestart() throws Exception { + loadAndWaitForCleanup(true); + } + + /** + * @throws Exception if failed. + */ + private void loadAndWaitForCleanup(boolean restartGrid) throws Exception { + IgniteEx srv = startGrid(0); + srv.cluster().active(true); + + fillCache(srv.cache(CACHE)); + + if (restartGrid) { + stopGrid(0); + srv = startGrid(0); + srv.cluster().active(true); + } + + final IgniteCache<Integer, String> cache = srv.cache(CACHE); + + pringStatistics((IgniteCacheProxy)cache, "After restart from LFS"); + + waitAndCheckExpired(cache); + + stopAllGrids(); + } + + /** + * @throws Exception if failed. + */ + public void testRebalancingWithTtlExpirable() throws Exception { + IgniteEx srv = startGrid(0); + srv.cluster().active(true); + + fillCache(srv.cache(CACHE)); + + //causes rebalancing start + srv = startGrid(1); + + final IgniteCache<Integer, String> cache = srv.cache(CACHE); + + pringStatistics((IgniteCacheProxy)cache, "After rebalancing start"); + + waitAndCheckExpired(cache); + + stopAllGrids(); + } + + /** */ + protected void fillCache(IgniteCache<Integer, String> cache) { + cache.putAll(new TreeMap<Integer, String>() {{ + for (int i = 0; i < ENTRIES; i++) + put(i, Strings.repeat("Some value " + i, 125)); + }}); + + //Touch entries. + for (int i = 0; i < ENTRIES; i++) + cache.get(i); // touch entries + + pringStatistics((IgniteCacheProxy)cache, "After cache puts"); + } + + /** */ + protected void waitAndCheckExpired(final IgniteCache<Integer, String> cache) throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cache.size() == 0; + } + }, TimeUnit.SECONDS.toMillis(EXPIRATION_TIMEOUT + 1)); + + pringStatistics((IgniteCacheProxy)cache, "After timeout"); + + for (int i = 0; i < ENTRIES; i++) + assertNull(cache.get(i)); + } + + /** */ + private void pringStatistics(IgniteCacheProxy cache, String msg) { + System.out.println(msg + " {{"); + cache.context().printMemoryStats(); + System.out.println("}} " + msg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 1e32320..ab81d8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicC import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsDataRegionMetricsTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWithTtlTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.DefaultPageSizeBackwardsCompatibilityTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsPageReplacementTest; @@ -111,6 +112,7 @@ public class IgnitePdsTestSuite extends TestSuite { // TODO uncomment when https://issues.apache.org/jira/browse/IGNITE-7510 is fixed // suite.addTestSuite(IgnitePdsClientNearCachePutGetTest.class); suite.addTestSuite(IgniteDbPutGetWithCacheStoreTest.class); + suite.addTestSuite(IgnitePdsWithTtlTest.class); suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 3f6f713..76cfe4f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -20,7 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest; -import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest2; +import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithExpiryPolicy; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedCacheDataTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest; @@ -90,7 +90,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { // Rebalancing test suite.addTestSuite(IgnitePdsContinuousRestartTest.class); - suite.addTestSuite(IgnitePdsContinuousRestartTest2.class); + suite.addTestSuite(IgnitePdsContinuousRestartTestWithExpiryPolicy.class); suite.addTestSuite(IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes.class); @@ -115,7 +115,6 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgnitePdsWholeClusterRestartTest.class); - // Rebalancing test suite.addTestSuite(IgniteWalHistoryReservationsTest.class);
