This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 501fb9d7f9c IGNITE-18075 Added partition size and counter check on
snapshot create. (#10363)
501fb9d7f9c is described below
commit 501fb9d7f9c54e5693247ec25c4aa43876271c81
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Nov 18 10:33:01 2022 +0300
IGNITE-18075 Added partition size and counter check on snapshot create.
(#10363)
---
.../snapshot/IgniteSnapshotManager.java | 5 +-
.../SnapshotPartitionsQuickVerifyHandler.java | 100 +++++++++++++++
.../snapshot/SnapshotPartitionsVerifyHandler.java | 14 ++-
.../IgniteClusterSnapshotStreamerTest.java | 135 +++++++++++++++++++--
4 files changed, 238 insertions(+), 16 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index a08d8d372c4..75867df52fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1061,7 +1061,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
public void streamerWarning() {
SnapshotOperationRequest snpTask = currentCreateRequest();
- if (snpTask != null)
+ if (snpTask != null && !snpTask.streamerWarning())
snpTask.streamerWarning(true);
}
@@ -2232,6 +2232,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
// Register system default DataStreamer updates check.
registerHandler(new DataStreamerUpdatesHandler());
+ // Register system default page size and counters check that is
used at the creation operation.
+ registerHandler(new
SnapshotPartitionsQuickVerifyHandler(ctx.cache().context()));
+
// Register custom handlers.
SnapshotHandler<Object>[] extHnds =
(SnapshotHandler<Object>[])ctx.plugins().extensions(SnapshotHandler.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
new file mode 100644
index 00000000000..97a94e0313b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Quick partitions verifier. Warns if partiton counters or size are different
among the nodes what can be caused by
+ * canceled/failed DataStreamer. Skips checking if the DataStreamer warning is
detected.
+ */
+public class SnapshotPartitionsQuickVerifyHandler extends
SnapshotPartitionsVerifyHandler {
+ /** */
+ public static final String WRN_MSG = "This may happen if DataStreamer with
property 'allowOverwrite' set " +
+ "to `false` is loading during the snapshot or hadn't successfully
finished earlier. However, you will be " +
+ "able restore rest the caches from this snapshot.";
+
+ /**
+ * @param cctx Shared context.
+ */
+ public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?>
cctx) {
+ super(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SnapshotHandlerType type() {
+ return SnapshotHandlerType.CREATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<PartitionKeyV2, PartitionHashRecordV2>
invoke(SnapshotHandlerContext opCtx)
+ throws IgniteCheckedException {
+ // Return null not to check partitions at all if the streamer warning
is detected.
+ if (opCtx.streamerWarning())
+ return null;
+
+ Map<PartitionKeyV2, PartitionHashRecordV2> res = super.invoke(opCtx);
+
+ assert res != null;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void complete(
+ String name,
+ Collection<SnapshotHandlerResult<Map<PartitionKeyV2,
PartitionHashRecordV2>>> results
+ ) throws IgniteCheckedException {
+ if (results.stream().anyMatch(r -> r.data() == null))
+ return;
+
+ Set<Integer> wrnGrps = new HashSet<>();
+
+ Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+
+ F.viewReadOnly(results, SnapshotHandlerResult::data).forEach(m ->
m.forEach((part, val) -> {
+ PartitionHashRecordV2 other = total.putIfAbsent(part, val);
+
+ if (other == null)
+ return;
+
+ if (val.size() != other.size() ||
!val.updateCounter().equals(other.updateCounter()))
+ wrnGrps.add(part.groupId());
+ }));
+
+ if (!wrnGrps.isEmpty()) {
+ throw new SnapshotHandlerWarningException("Cache partitions differ
for cache groups " + S.compact(wrnGrps)
+ + ". " + WRN_MSG);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean skipHash() {
+ return true;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 6c408491d66..1a0ea4f1e68 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -81,7 +81,7 @@ import static
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtili
*/
public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<PartitionKeyV2, PartitionHashRecordV2>> {
/** Shared context. */
- private final GridCacheSharedContext<?, ?> cctx;
+ protected final GridCacheSharedContext<?, ?> cctx;
/** Logger. */
private final IgniteLogger log;
@@ -219,7 +219,8 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
GridDhtPartitionState.OWNING,
false,
size,
- snpMgr.partitionRowIterator(snpCtx, grpName,
partId, pageStore));
+ skipHash() ? F.emptyIterator()
+ : snpMgr.partitionRowIterator(snpCtx, grpName,
partId, pageStore));
assert hash != null : "OWNING must have hash: " + key;
@@ -275,6 +276,15 @@ public class SnapshotPartitionsVerifyHandler implements
SnapshotHandler<Map<Part
throw new IgniteCheckedException(buf.toString());
}
+ /**
+ * Provides flag of full hash calculation.
+ *
+ * @return {@code True} if full partition hash calculation is required.
{@code False} otherwise.
+ */
+ protected boolean skipHash() {
+ return false;
+ }
+
/**
* Provides encryption keys stored within snapshot.
* <p>
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index c784f3b6f2f..1d1cd6d7e19 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -18,26 +18,34 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
- * Tests snapshot is consistent or snapshot process produces proper warning.
+ * Tests snapshot is consistent or snapshot process produces proper warning
with concurrent streaming.
*/
public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest {
/** */
@@ -83,19 +91,35 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
}
/**
- * Tests snapshot consistency wnen streamer starts before snapshot.
Default receiver.
+ * Tests snapshot warning when streamer is working during snapshot
creation. Default receiver.
*/
@Test
public void testStreamerWhileSnapshotDefault() throws Exception {
- doTestDataStreamerWhileSnapshot( false);
+ doTestDataStreamerWhileSnapshot(false);
}
/**
- * Tests snapshot consistency wnen streamer starts before snapshot.
Overwriting receiver.
+ * Tests snapshot warning when streamer is working during snapshot
creation. Overwriting receiver.
*/
@Test
public void testStreamerWhileSnapshotOverwriting() throws Exception {
- doTestDataStreamerWhileSnapshot( true);
+ doTestDataStreamerWhileSnapshot(true);
+ }
+
+ /**
+ * Tests snapshot warning when streamer failed or canceled before
snapshot. Default receiver.
+ */
+ @Test
+ public void testStreamerFailsLongAgoDefault() throws Exception {
+ doTestDataStreamerFailedBeforeSnapshot(false);
+ }
+
+ /**
+ * Tests snapshot warning when streamer failed or canceled before
snapshot. Overwriting receiver.
+ */
+ @Test
+ public void testStreamerFailsLongAgoOverwriting() throws Exception {
+ doTestDataStreamerFailedBeforeSnapshot(true);
}
/**
@@ -156,7 +180,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
AtomicBoolean stop = new AtomicBoolean();
- IgniteInternalFuture<?> loadFut = runLoad(grid(2), false, stop);
+ IgniteInternalFuture<?> loadFut = runLoad(client, false, stop);
try {
snpMgr.createSnapshot(SNAPSHOT_NAME).get();
@@ -175,29 +199,82 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
}
/**
- * Tests snapshot process throws warning if required.
+ * Tests snapshot warning when streamer is working during snapshot
creation.
*
* @param allowOverwrite 'allowOverwrite' setting.
*/
private void doTestDataStreamerWhileSnapshot(boolean allowOverwrite)
throws Exception {
- AtomicBoolean stopLoading = new AtomicBoolean(false);
+ AtomicBoolean stopLoading = new AtomicBoolean();
+
+ TestRecordingCommunicationSpi clientCm =
+
(TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite,
stopLoading);
+ clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+
+ clientCm.waitForBlocked(batchesPerNode(grid(0)));
+
try {
if (allowOverwrite)
- grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+ createAndCheckSnapshot(null, null);
else {
- assertThrows(null, () ->
snpMgr.createSnapshot(SNAPSHOT_NAME).get(),
- IgniteException.class, DataStreamerUpdatesHandler.WRN_MSG);
+ createAndCheckSnapshot(DataStreamerUpdatesHandler.WRN_MSG,
+ SnapshotPartitionsQuickVerifyHandler.WRN_MSG);
}
}
finally {
+ clientCm.stopBlock();
stopLoading.set(true);
loadFut.get();
}
}
+ /**
+ * Tests snapshot warning when streamer failed or canceled before snapshot.
+ *
+ * @param allowOverwrite 'allowOverwrite' setting.
+ */
+ private void doTestDataStreamerFailedBeforeSnapshot(boolean
allowOverwrite) throws Exception {
+ TestRecordingCommunicationSpi clientCm =
+
(TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+ UUID clientId = client.localNode().id();
+
+ CountDownLatch nodeGoneLatch = new CountDownLatch(1);
+
+ grid(0).events().localListen(e -> {
+ assert e instanceof DiscoveryEvent;
+
+ if (((DiscoveryEvent)e).eventNode().id().equals(clientId))
+ nodeGoneLatch.countDown();
+
+ return false;
+ }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+ AtomicBoolean stopLoading = new AtomicBoolean();
+
+ IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite,
stopLoading);
+
+ clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+
+ clientCm.waitForBlocked(batchesPerNode(grid(0)));
+
+ runAsync(() -> stopGrid(client.name(), true));
+
+ nodeGoneLatch.await();
+
+ stopLoading.set(true);
+ loadFut.cancel();
+
+ if (allowOverwrite)
+ createAndCheckSnapshot(null, null);
+ else {
+
createAndCheckSnapshot(SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
+ DataStreamerUpdatesHandler.WRN_MSG);
+ }
+ }
+
/**
* Runs DataStreamer asynchronously. Waits while streamer pre-loads some
amount of data.
*
@@ -209,7 +286,7 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
throws InterruptedException {
CountDownLatch preload = new CountDownLatch(10_000);
- IgniteInternalFuture<?> res = GridTestUtils.runAsync(() -> {
+ IgniteInternalFuture<?> res = runAsync(() -> {
try (IgniteDataStreamer<Integer, Object> ds =
ldr.dataStreamer(dfltCacheCfg.getName())) {
ds.allowOverwrite(allowOverwrite);
@@ -227,4 +304,36 @@ public class IgniteClusterSnapshotStreamerTest extends
AbstractSnapshotSelfTest
return res;
}
+
+ /** */
+ private void createAndCheckSnapshot(String expWrn, String notexpWrn)
throws IgniteCheckedException {
+ assert notexpWrn == null || expWrn != null;
+
+ if (expWrn == null)
+ snpMgr.createSnapshot(SNAPSHOT_NAME, null).get();
+ else {
+ Throwable snpWrn = assertThrows(
+ null,
+ () -> snpMgr.createSnapshot(SNAPSHOT_NAME, null).get(),
+ IgniteException.class,
+ expWrn
+ );
+
+ if (notexpWrn != null)
+ assertTrue(!snpWrn.getMessage().contains(notexpWrn));
+ }
+
+ IdleVerifyResultV2 checkRes = snpMgr.checkSnapshot(SNAPSHOT_NAME,
null).get();
+
+ assertTrue(checkRes.exceptions().isEmpty());
+ assertTrue((expWrn != null) == checkRes.hasConflicts());
+ }
+
+ /** */
+ private int batchesPerNode(IgniteEx grid) {
+ Integer poolSize =
grid.localNode().attribute(ATTR_DATA_STREAMER_POOL_SIZE);
+
+ return IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER * (poolSize !=
null ? poolSize :
+ grid.localNode().metrics().getTotalCpus());
+ }
}