This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 501fb9d7f9c IGNITE-18075 Added partition size and counter check on 
snapshot create. (#10363)
501fb9d7f9c is described below

commit 501fb9d7f9c54e5693247ec25c4aa43876271c81
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Nov 18 10:33:01 2022 +0300

    IGNITE-18075 Added partition size and counter check on snapshot create. 
(#10363)
---
 .../snapshot/IgniteSnapshotManager.java            |   5 +-
 .../SnapshotPartitionsQuickVerifyHandler.java      | 100 +++++++++++++++
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |  14 ++-
 .../IgniteClusterSnapshotStreamerTest.java         | 135 +++++++++++++++++++--
 4 files changed, 238 insertions(+), 16 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index a08d8d372c4..75867df52fb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1061,7 +1061,7 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     public void streamerWarning() {
         SnapshotOperationRequest snpTask = currentCreateRequest();
 
-        if (snpTask != null)
+        if (snpTask != null && !snpTask.streamerWarning())
             snpTask.streamerWarning(true);
     }
 
@@ -2232,6 +2232,9 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
             // Register system default DataStreamer updates check.
             registerHandler(new DataStreamerUpdatesHandler());
 
+            // Register system default page size and counters check that is 
used at the creation operation.
+            registerHandler(new 
SnapshotPartitionsQuickVerifyHandler(ctx.cache().context()));
+
             // Register custom handlers.
             SnapshotHandler<Object>[] extHnds = 
(SnapshotHandler<Object>[])ctx.plugins().extensions(SnapshotHandler.class);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
new file mode 100644
index 00000000000..97a94e0313b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Quick partitions verifier. Warns if partiton counters or size are different 
among the nodes what can be caused by
+ * canceled/failed DataStreamer. Skips checking if the DataStreamer warning is 
detected.
+ */
+public class SnapshotPartitionsQuickVerifyHandler extends 
SnapshotPartitionsVerifyHandler {
+    /** */
+    public static final String WRN_MSG = "This may happen if DataStreamer with 
property 'allowOverwrite' set " +
+        "to `false` is loading during the snapshot or hadn't successfully 
finished earlier. However, you will be " +
+        "able restore rest the caches from this snapshot.";
+
+    /**
+     * @param cctx Shared context.
+     */
+    public SnapshotPartitionsQuickVerifyHandler(GridCacheSharedContext<?, ?> 
cctx) {
+        super(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SnapshotHandlerType type() {
+        return SnapshotHandlerType.CREATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<PartitionKeyV2, PartitionHashRecordV2> 
invoke(SnapshotHandlerContext opCtx)
+        throws IgniteCheckedException {
+        // Return null not to check partitions at all if the streamer warning 
is detected.
+        if (opCtx.streamerWarning())
+            return null;
+
+        Map<PartitionKeyV2, PartitionHashRecordV2> res = super.invoke(opCtx);
+
+        assert res != null;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void complete(
+        String name,
+        Collection<SnapshotHandlerResult<Map<PartitionKeyV2, 
PartitionHashRecordV2>>> results
+    ) throws IgniteCheckedException {
+        if (results.stream().anyMatch(r -> r.data() == null))
+            return;
+
+        Set<Integer> wrnGrps = new HashSet<>();
+
+        Map<PartitionKeyV2, PartitionHashRecordV2> total = new HashMap<>();
+
+        F.viewReadOnly(results, SnapshotHandlerResult::data).forEach(m -> 
m.forEach((part, val) -> {
+            PartitionHashRecordV2 other = total.putIfAbsent(part, val);
+
+            if (other == null)
+                return;
+
+            if (val.size() != other.size() || 
!val.updateCounter().equals(other.updateCounter()))
+                wrnGrps.add(part.groupId());
+        }));
+
+        if (!wrnGrps.isEmpty()) {
+            throw new SnapshotHandlerWarningException("Cache partitions differ 
for cache groups " + S.compact(wrnGrps)
+                + ". " + WRN_MSG);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean skipHash() {
+        return true;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 6c408491d66..1a0ea4f1e68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -81,7 +81,7 @@ import static 
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtili
  */
 public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<PartitionKeyV2, PartitionHashRecordV2>> {
     /** Shared context. */
-    private final GridCacheSharedContext<?, ?> cctx;
+    protected final GridCacheSharedContext<?, ?> cctx;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -219,7 +219,8 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
                             GridDhtPartitionState.OWNING,
                             false,
                             size,
-                            snpMgr.partitionRowIterator(snpCtx, grpName, 
partId, pageStore));
+                            skipHash() ? F.emptyIterator()
+                                : snpMgr.partitionRowIterator(snpCtx, grpName, 
partId, pageStore));
 
                         assert hash != null : "OWNING must have hash: " + key;
 
@@ -275,6 +276,15 @@ public class SnapshotPartitionsVerifyHandler implements 
SnapshotHandler<Map<Part
         throw new IgniteCheckedException(buf.toString());
     }
 
+    /**
+     * Provides flag of full hash calculation.
+     *
+     * @return {@code True} if full partition hash calculation is required. 
{@code False} otherwise.
+     */
+    protected boolean skipHash() {
+        return false;
+    }
+
     /**
      * Provides encryption keys stored within snapshot.
      * <p>
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
index c784f3b6f2f..1d1cd6d7e19 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotStreamerTest.java
@@ -18,26 +18,34 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
 /**
- * Tests snapshot is consistent or snapshot process produces proper warning.
+ * Tests snapshot is consistent or snapshot process produces proper warning 
with concurrent streaming.
  */
 public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest {
     /** */
@@ -83,19 +91,35 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
     }
 
     /**
-     * Tests snapshot consistency wnen streamer starts before snapshot. 
Default receiver.
+     * Tests snapshot warning when streamer is working during snapshot 
creation. Default receiver.
      */
     @Test
     public void testStreamerWhileSnapshotDefault() throws Exception {
-        doTestDataStreamerWhileSnapshot( false);
+        doTestDataStreamerWhileSnapshot(false);
     }
 
     /**
-     * Tests snapshot consistency wnen streamer starts before snapshot. 
Overwriting receiver.
+     * Tests snapshot warning when streamer is working during snapshot 
creation. Overwriting receiver.
      */
     @Test
     public void testStreamerWhileSnapshotOverwriting() throws Exception {
-        doTestDataStreamerWhileSnapshot( true);
+        doTestDataStreamerWhileSnapshot(true);
+    }
+
+    /**
+     * Tests snapshot warning when streamer failed or canceled before 
snapshot. Default receiver.
+     */
+    @Test
+    public void testStreamerFailsLongAgoDefault() throws Exception {
+        doTestDataStreamerFailedBeforeSnapshot(false);
+    }
+
+    /**
+     * Tests snapshot warning when streamer failed or canceled before 
snapshot. Overwriting receiver.
+     */
+    @Test
+    public void testStreamerFailsLongAgoOverwriting() throws Exception {
+        doTestDataStreamerFailedBeforeSnapshot(true);
     }
 
     /**
@@ -156,7 +180,7 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
 
         AtomicBoolean stop = new AtomicBoolean();
 
-        IgniteInternalFuture<?> loadFut = runLoad(grid(2), false, stop);
+        IgniteInternalFuture<?> loadFut = runLoad(client, false, stop);
 
         try {
             snpMgr.createSnapshot(SNAPSHOT_NAME).get();
@@ -175,29 +199,82 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
     }
 
     /**
-     * Tests snapshot process throws warning if required.
+     * Tests snapshot warning when streamer is working during snapshot 
creation.
      *
      * @param allowOverwrite 'allowOverwrite' setting.
      */
     private void doTestDataStreamerWhileSnapshot(boolean allowOverwrite) 
throws Exception {
-        AtomicBoolean stopLoading = new AtomicBoolean(false);
+        AtomicBoolean stopLoading = new AtomicBoolean();
+
+        TestRecordingCommunicationSpi clientCm =
+            
(TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
 
         IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite, 
stopLoading);
 
+        clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+
+        clientCm.waitForBlocked(batchesPerNode(grid(0)));
+
         try {
             if (allowOverwrite)
-                grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+                createAndCheckSnapshot(null, null);
             else {
-                assertThrows(null, () -> 
snpMgr.createSnapshot(SNAPSHOT_NAME).get(),
-                    IgniteException.class, DataStreamerUpdatesHandler.WRN_MSG);
+                createAndCheckSnapshot(DataStreamerUpdatesHandler.WRN_MSG,
+                    SnapshotPartitionsQuickVerifyHandler.WRN_MSG);
             }
         }
         finally {
+            clientCm.stopBlock();
             stopLoading.set(true);
             loadFut.get();
         }
     }
 
+    /**
+     * Tests snapshot warning when streamer failed or canceled before snapshot.
+     *
+     * @param allowOverwrite 'allowOverwrite' setting.
+     */
+    private void doTestDataStreamerFailedBeforeSnapshot(boolean 
allowOverwrite) throws Exception {
+        TestRecordingCommunicationSpi clientCm =
+            
(TestRecordingCommunicationSpi)client.configuration().getCommunicationSpi();
+
+        UUID clientId = client.localNode().id();
+
+        CountDownLatch nodeGoneLatch = new CountDownLatch(1);
+
+        grid(0).events().localListen(e -> {
+            assert e instanceof DiscoveryEvent;
+
+            if (((DiscoveryEvent)e).eventNode().id().equals(clientId))
+                nodeGoneLatch.countDown();
+
+            return false;
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
+        AtomicBoolean stopLoading = new AtomicBoolean();
+
+        IgniteInternalFuture<?> loadFut = runLoad(client, allowOverwrite, 
stopLoading);
+
+        clientCm.blockMessages(DataStreamerRequest.class, grid(0).name());
+
+        clientCm.waitForBlocked(batchesPerNode(grid(0)));
+
+        runAsync(() -> stopGrid(client.name(), true));
+
+        nodeGoneLatch.await();
+
+        stopLoading.set(true);
+        loadFut.cancel();
+
+        if (allowOverwrite)
+            createAndCheckSnapshot(null, null);
+        else {
+            
createAndCheckSnapshot(SnapshotPartitionsQuickVerifyHandler.WRN_MSG,
+                DataStreamerUpdatesHandler.WRN_MSG);
+        }
+    }
+
     /**
      * Runs DataStreamer asynchronously. Waits while streamer pre-loads some 
amount of data.
      *
@@ -209,7 +286,7 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
         throws InterruptedException {
         CountDownLatch preload = new CountDownLatch(10_000);
 
-        IgniteInternalFuture<?> res = GridTestUtils.runAsync(() -> {
+        IgniteInternalFuture<?> res = runAsync(() -> {
             try (IgniteDataStreamer<Integer, Object> ds = 
ldr.dataStreamer(dfltCacheCfg.getName())) {
                 ds.allowOverwrite(allowOverwrite);
 
@@ -227,4 +304,36 @@ public class IgniteClusterSnapshotStreamerTest extends 
AbstractSnapshotSelfTest
 
         return res;
     }
+
+    /** */
+    private void createAndCheckSnapshot(String expWrn, String notexpWrn) 
throws IgniteCheckedException {
+        assert notexpWrn == null || expWrn != null;
+
+        if (expWrn == null)
+            snpMgr.createSnapshot(SNAPSHOT_NAME, null).get();
+        else {
+            Throwable snpWrn = assertThrows(
+                null,
+                () -> snpMgr.createSnapshot(SNAPSHOT_NAME, null).get(),
+                IgniteException.class,
+                expWrn
+            );
+
+            if (notexpWrn != null)
+                assertTrue(!snpWrn.getMessage().contains(notexpWrn));
+        }
+
+        IdleVerifyResultV2 checkRes = snpMgr.checkSnapshot(SNAPSHOT_NAME, 
null).get();
+
+        assertTrue(checkRes.exceptions().isEmpty());
+        assertTrue((expWrn != null) == checkRes.hasConflicts());
+    }
+
+    /** */
+    private int batchesPerNode(IgniteEx grid) {
+        Integer poolSize = 
grid.localNode().attribute(ATTR_DATA_STREAMER_POOL_SIZE);
+
+        return IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER * (poolSize != 
null ? poolSize :
+            grid.localNode().metrics().getTotalCpus());
+    }
 }

Reply via email to