IGNITE-3477 - Log partition destroy to WAL
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e4b05d53 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e4b05d53 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e4b05d53 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: e4b05d53d52ce3500e409d7275942d92e23db01b Parents: b51a2f8 Author: Alexey Goncharuk <[email protected]> Authored: Tue Mar 7 18:01:31 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Mar 7 18:01:31 2017 +0300 ---------------------------------------------------------------------- .../internal/pagemem/wal/record/WALRecord.java | 5 +- .../MetaPageUpdatePartitionDataRecord.java | 17 +++- .../record/delta/PartitionDestroyRecord.java | 73 ++++++++++++++++++ .../record/delta/PartitionMetaStateRecord.java | 2 +- .../dht/preloader/GridDhtPartitionDemander.java | 81 ++++++++++---------- 5 files changed, 135 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e4b05d53/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 9c2c88a..142f0ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -160,7 +160,10 @@ public abstract class WALRecord { DATA_PAGE_UPDATE_RECORD, /** init */ - BTREE_META_PAGE_INIT_ROOT2 + BTREE_META_PAGE_INIT_ROOT2, + + /** Partition destroy. */ + PARTITION_DESTROY ; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e4b05d53/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java index 66efc6f..ef57c46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.util.typedef.internal.S; /** * @@ -45,8 +46,15 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { * @param pageId Page ID. * @param allocatedIdxCandidate Page Allocated index candidate */ - public MetaPageUpdatePartitionDataRecord(int cacheId, long pageId, long updateCntr, long globalRmvId, int partSize, - byte state, int allocatedIdxCandidate) { + public MetaPageUpdatePartitionDataRecord( + int cacheId, + long pageId, + long updateCntr, + long globalRmvId, + int partSize, + byte state, + int allocatedIdxCandidate + ) { super(cacheId, pageId); this.updateCntr = updateCntr; @@ -104,4 +112,9 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { @Override public RecordType type() { return RecordType.PARTITION_META_PAGE_UPDATE_COUNTERS; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MetaPageUpdatePartitionDataRecord.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e4b05d53/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionDestroyRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionDestroyRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionDestroyRecord.java new file mode 100644 index 0000000..c3b8200 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionDestroyRecord.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record.delta; + +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; + +/** + * + */ +public class PartitionDestroyRecord extends WALRecord { + /** */ + private int cacheId; + + /** */ + private int partId; + + /** + * @param cacheId Cache ID. + * @param partId Partition ID. + */ + public PartitionDestroyRecord(int cacheId, int partId) { + this.cacheId = cacheId; + this.partId = partId; + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.PARTITION_DESTROY; + } + + /** + * @return Cache ID. + */ + public int cacheId() { + return cacheId; + } + + /** + * @param cacheId Cache ID. + */ + public void cacheId(int cacheId) { + this.cacheId = cacheId; + } + + /** + * @return Partition ID. + */ + public int partitionId() { + return partId; + } + + /** + * @param partId Partition ID. + */ + public void partitionId(int partId) { + this.partId = partId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e4b05d53/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionMetaStateRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionMetaStateRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionMetaStateRecord.java index fd8b2fd..95e1a56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionMetaStateRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PartitionMetaStateRecord.java @@ -69,7 +69,7 @@ public class PartitionMetaStateRecord extends WALRecord { /** * */ - public int partId() { + public int partitionId() { return partId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e4b05d53/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index ce43ec4..4f8d13b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -778,8 +778,6 @@ public class GridDhtPartitionDemander { GridCacheEntryInfo entry, AffinityTopologyVersion topVer ) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); - try { GridCacheEntryEx cached = null; @@ -800,34 +798,41 @@ public class GridDhtPartitionDemander { return true; } - if (preloadPred == null || preloadPred.apply(entry)) { - if (cached.initialValue( - entry.value(), - entry.version(), - entry.ttl(), - entry.expireTime(), - true, - topVer, - cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, - false - )) { - cctx.evicts().touch(cached, topVer); // Start tracking. - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null, true); - } - else { - cctx.evicts().touch(cached, topVer); // Start tracking. + cctx.shared().database().checkpointReadLock(); - if (log.isDebugEnabled()) - log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + - ", part=" + p + ']'); + try { + if (preloadPred == null || preloadPred.apply(entry)) { + if (cached.initialValue( + entry.value(), + entry.version(), + entry.ttl(), + entry.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, + false + )) { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, + false, null, null, null, true); + } + else { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (log.isDebugEnabled()) + log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + p + ']'); + } } + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); + } + finally { + cctx.shared().database().checkpointReadUnlock(); } - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } catch (GridCacheEntryRemovedException ignored) { if (log.isDebugEnabled()) @@ -848,9 +853,6 @@ public class GridDhtPartitionDemander { throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); } - finally { - cctx.shared().database().checkpointReadUnlock(); - } return true; } @@ -911,8 +913,9 @@ public class GridDhtPartitionDemander { long updateSeq) { assert assigns != null; - this.exchFut = assigns.exchangeFuture(); - this.topVer = assigns.topologyVersion(); + exchFut = assigns.exchangeFuture(); + topVer = assigns.topologyVersion(); + this.cctx = cctx; this.log = log; this.startedEvtSent = startedEvtSent; @@ -924,13 +927,13 @@ public class GridDhtPartitionDemander { * Dummy future. Will be done by real one. */ public RebalanceFuture() { - this.exchFut = null; - this.topVer = null; - this.cctx = null; - this.log = null; - this.startedEvtSent = null; - this.stoppedEvtSent = null; - this.updateSeq = -1; + exchFut = null; + topVer = null; + cctx = null; + log = null; + startedEvtSent = null; + stoppedEvtSent = null; + updateSeq = -1; } /**
