This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5933b4e IGNITE-12231 Fixed logging RollbackRecords to WAL
5933b4e is described below
commit 5933b4ec62de26ce25a8cc1a5be9ba4fcab7befc
Author: Andrey Gura <[email protected]>
AuthorDate: Wed Sep 25 16:21:29 2019 +0300
IGNITE-12231 Fixed logging RollbackRecords to WAL
---
.../dht/topology/GridDhtPartitionTopologyImpl.java | 16 +++-
.../cache/transactions/IgniteTxHandler.java | 91 ++++++++++++----------
2 files changed, 65 insertions(+), 42 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index e5d5651..2078eab 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -42,6 +42,7 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -2734,6 +2735,8 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
ctx.database().checkpointReadLock();
try {
+ WALPointer ptr = null;
+
lock.readLock().lock();
try {
@@ -2764,7 +2767,7 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
gapStart - 1, gapStop - gapStart + 1);
try {
- ctx.wal().log(rec);
+ ptr = ctx.wal().log(rec);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -2779,7 +2782,16 @@ public class GridDhtPartitionTopologyImpl implements
GridDhtPartitionTopology {
}
}
finally {
- lock.readLock().unlock();
+ try {
+ if (ptr != null)
+ ctx.wal().flush(ptr, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
}
}
finally {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e62665f..4055709 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -31,9 +31,11 @@ import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -2288,65 +2290,74 @@ public class IgniteTxHandler {
if (counters == null)
return;
- for (PartitionUpdateCountersMessage counter : counters) {
- GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
+ WALPointer ptr = null;
- GridDhtPartitionTopology top = ctx0.topology();
+ try {
+ for (PartitionUpdateCountersMessage counter : counters) {
+ GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
- AffinityTopologyVersion topVer = top.readyTopologyVersion();
+ GridDhtPartitionTopology top = ctx0.topology();
- assert top != null;
+ AffinityTopologyVersion topVer = top.readyTopologyVersion();
- for (int i = 0; i < counter.size(); i++) {
- boolean invalid = false;
+ assert top != null;
- try {
- GridDhtLocalPartition part =
top.localPartition(counter.partition(i));
+ for (int i = 0; i < counter.size(); i++) {
+ boolean invalid = false;
- if (part != null && part.reserve()) {
- try {
- if (part.state() != GridDhtPartitionState.RENTING)
{ // Check is actual only for backup node.
- long start = counter.initialCounter(i);
- long delta = counter.updatesCount(i);
+ try {
+ GridDhtLocalPartition part =
top.localPartition(counter.partition(i));
+
+ if (part != null && part.reserve()) {
+ try {
+ if (part.state() !=
GridDhtPartitionState.RENTING) { // Check is actual only for backup node.
+ long start = counter.initialCounter(i);
+ long delta = counter.updatesCount(i);
- boolean updated = part.updateCounter(start,
delta);
+ boolean updated =
part.updateCounter(start, delta);
- // Need to log rolled back range for logical
recovery.
- if (updated && rollback) {
- if (part.group().persistenceEnabled() &&
- part.group().walEnabled() &&
- !part.group().mvccEnabled()) {
- RollbackRecord rec = new
RollbackRecord(part.group().groupId(), part.id(),
- start, delta);
+ // Need to log rolled back range for
logical recovery.
+ if (updated && rollback) {
+ CacheGroupContext grpCtx =
part.group();
- ctx.wal().log(rec);
- }
+ if (grpCtx.persistenceEnabled() &&
grpCtx.walEnabled() && !grpCtx.mvccEnabled()) {
+ RollbackRecord rec =
+ new
RollbackRecord(grpCtx.groupId(), part.id(), start, delta);
+
+ ptr = ctx.wal().log(rec);
+ }
- for (int cntr = 1; cntr <= delta; cntr++) {
-
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
- topVer, rollbackOnPrimary);
+ for (int cntr = 1; cntr <= delta;
cntr++) {
+
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
+ topVer, rollbackOnPrimary);
+ }
}
}
+ else
+ invalid = true;
+ }
+ finally {
+ part.release();
}
- else
- invalid = true;
- }
- finally {
- part.release();
}
+ else
+ invalid = true;
}
- else
+ catch (GridDhtInvalidPartitionException e) {
invalid = true;
- }
- catch (GridDhtInvalidPartitionException e) {
- invalid = true;
- }
+ }
- if (invalid && log.isDebugEnabled())
- log.debug("Received partition update counters message for
invalid partition, ignoring: " +
- "[cacheId=" + counter.cacheId() + ", part=" +
counter.partition(i) + "]");
+ if (invalid && log.isDebugEnabled()) {
+ log.debug("Received partition update counters message
for invalid partition, ignoring: " +
+ "[cacheId=" + counter.cacheId() + ", part=" +
counter.partition(i) + ']');
+ }
+ }
}
}
+ finally {
+ if (ptr != null)
+ ctx.wal().flush(ptr, false);
+ }
}
/**