Repository: ignite
Updated Branches:
  refs/heads/master de5e577c4 -> 5cf44e8e3


IGNITE-9841: SQL: Improved tracking of LOST partitions for queries when 
persistence is enabled. This closes #5069.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cf44e8e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cf44e8e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cf44e8e

Branch: refs/heads/master
Commit: 5cf44e8e3af75dcbf45e2ad2b230550b27e87923
Parents: de5e577
Author: Stanislav Lukyanov <stanlukya...@gmail.com>
Authored: Fri Nov 2 11:49:00 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Fri Nov 2 11:49:00 2018 +0300

----------------------------------------------------------------------
 .../IgniteCachePartitionLossPolicySelfTest.java | 435 +++++++++++++++++--
 .../h2/twostep/GridReduceQueryExecutor.java     |  21 +
 ...ndexingCachePartitionLossPolicySelfTest.java |  87 +---
 3 files changed, 428 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index 1616e8f..caf0829 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -24,17 +24,23 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
+import junit.framework.AssertionFailedError;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.events.Event;
@@ -81,7 +87,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
     private final AtomicBoolean delayPartExchange = new AtomicBoolean(false);
 
     /** */
-    private final TopologyChanger killSingleNode = new TopologyChanger(false, 
Arrays.asList(3), Arrays.asList(0, 1, 2, 4),0);
+    private final TopologyChanger killSingleNode = new TopologyChanger(false, 
Collections.singletonList(3), Arrays.asList(0, 1, 2, 4), 0);
+
+    /** */
+    private boolean isPersistenceEnabled;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
@@ -103,6 +112,10 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
 
         cfg.setCacheConfiguration(cacheConfiguration());
 
+        cfg.setDataStorageConfiguration(new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+            new 
DataRegionConfiguration().setPersistenceEnabled(isPersistenceEnabled)
+        ));
+
         return cfg;
     }
 
@@ -122,17 +135,25 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
         delayPartExchange.set(false);
 
+        partLossPlc = PartitionLossPolicy.IGNORE;
+
         backups = 0;
+
+        isPersistenceEnabled = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
     }
 
     /**
@@ -147,6 +168,17 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadOnlySafeWithPersistence() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(false, true, killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadOnlyAll() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
 
@@ -156,6 +188,19 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadOnlyAllWithPersistence() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10041";);
+
+        partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(false, false, killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafe() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -165,6 +210,17 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteSafeWithPersistence() throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteAll() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
 
@@ -174,6 +230,19 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteAllWithPersistence() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10041";);
+
+        partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, false, killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -183,6 +252,17 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws 
Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(false, 
Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception 
{
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -192,6 +272,17 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() 
throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(false, 
Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeWithBackupsAfterKillThreeNodes() throws 
Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -203,6 +294,21 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void 
testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws 
Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10043";);
+
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        backups = 1;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeAfterKillCrd() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -212,6 +318,17 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteSafeAfterKillCrdWithPersistence() throws 
Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeWithBackups() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -223,6 +340,19 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     /**
      * @throws Exception if failed.
      */
+    public void testReadWriteSafeWithBackupsWithPersistence() throws Exception 
{
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        backups = 1;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
     public void testReadWriteSafeWithBackupsAfterKillCrd() throws Exception {
         partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
 
@@ -232,12 +362,81 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     }
 
     /**
-     * @param topChanger topology changer.
      * @throws Exception if failed.
      */
-    public void testIgnore(TopologyChanger topChanger) throws Exception {
+    public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() 
throws Exception {
+        partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+        backups = 1;
+
+        isPersistenceEnabled = true;
+
+        checkLostPartition(true, true, new TopologyChanger(true, 
Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testIgnore() throws Exception {
         fail("https://issues.apache.org/jira/browse/IGNITE-5078";);
 
+        partLossPlc = PartitionLossPolicy.IGNORE;
+
+        checkIgnore(killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testIgnoreWithPersistence() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-5078";);
+
+        fail("https://issues.apache.org/jira/browse/IGNITE-10041";);
+
+        partLossPlc = PartitionLossPolicy.IGNORE;
+
+        isPersistenceEnabled = true;
+
+        checkIgnore(killSingleNode);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testIgnoreKillThreeNodes() throws Exception {
+        partLossPlc = PartitionLossPolicy.IGNORE;
+
+        // TODO aliveNodes should include node 4, but it fails due to 
https://issues.apache.org/jira/browse/IGNITE-5078.
+        // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
+        // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, 
Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
+        TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, 
Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
+
+        checkIgnore(onlyCrdIsAlive);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testIgnoreKillThreeNodesWithPersistence() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10041";);
+
+        partLossPlc = PartitionLossPolicy.IGNORE;
+
+        isPersistenceEnabled = true;
+
+        // TODO aliveNodes should include node 4, but it fails due to 
https://issues.apache.org/jira/browse/IGNITE-5078.
+        // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
+        // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, 
Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
+        TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, 
Arrays.asList(1, 2, 3), Collections.singletonList(0), 0);
+
+        checkIgnore(onlyCrdIsAlive);
+    }
+
+    /**
+     * @param topChanger topology changer.
+     * @throws Exception if failed.
+     */
+    private void checkIgnore(TopologyChanger topChanger) throws Exception {
         topChanger.changeTopology();
 
         for (Ignite ig : G.allGrids()) {
@@ -266,14 +465,14 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
     private void checkLostPartition(boolean canWrite, boolean safe, 
TopologyChanger topChanger) throws Exception {
         assert partLossPlc != null;
 
-        int part = topChanger.changeTopology().get(0);
+        List<Integer> lostParts = topChanger.changeTopology();
 
         // Wait for all grids (servers and client) have same topology version
         // to make sure that all nodes received map with lost partition.
-        GridTestUtils.waitForCondition(() -> {
+        boolean success = GridTestUtils.waitForCondition(() -> {
             AffinityTopologyVersion last = null;
             for (Ignite ig : G.allGrids()) {
-                AffinityTopologyVersion ver = ((IgniteEx) 
ig).context().cache().context().exchange().readyAffinityVersion();
+                AffinityTopologyVersion ver = 
((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion();
 
                 if (last != null && !last.equals(ver))
                     return false;
@@ -284,35 +483,57 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
             return true;
         }, 10000);
 
+        assertTrue("Failed to wait for new topology", success);
+
         for (Ignite ig : G.allGrids()) {
             info("Checking node: " + ig.cluster().localNode().id());
 
             IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
 
-            verifyCacheOps(canWrite, safe, part, ig);
+            verifyLostPartitions(ig, lostParts);
 
-            // Check we can read and write to lost partition in recovery mode.
-            IgniteCache<Integer, Integer> recoverCache = 
cache.withPartitionRecover();
+            verifyCacheOps(canWrite, safe, ig);
 
-            for (int lostPart : recoverCache.lostPartitions()) {
-                recoverCache.get(lostPart);
-                recoverCache.put(lostPart, lostPart);
-            }
+            validateQuery(safe, ig);
 
-            // Check that writing in recover mode does not clear partition 
state.
-            verifyCacheOps(canWrite, safe, part, ig);
+            // TODO withPartitionRecover doesn't work with BLT - 
https://issues.apache.org/jira/browse/IGNITE-10041.
+            if (!isPersistenceEnabled) {
+                // Check we can read and write to lost partition in recovery 
mode.
+                IgniteCache<Integer, Integer> recoverCache = 
cache.withPartitionRecover();
+
+                for (int lostPart : recoverCache.lostPartitions()) {
+                    recoverCache.get(lostPart);
+                    recoverCache.put(lostPart, lostPart);
+                }
 
-            // Validate queries.
-            validateQuery(safe, part, ig);
+                // Check that writing in recover mode does not clear partition 
state.
+                verifyLostPartitions(ig, lostParts);
+
+                verifyCacheOps(canWrite, safe, ig);
+
+                validateQuery(safe, ig);
+            }
         }
 
-        // Check that partition state does not change after we start a new 
node.
-        IgniteEx grd = startGrid(3);
+        // Bring all nodes back.
+        for (int i : topChanger.killNodes) {
+            IgniteEx grd = startGrid(i);
 
-        info("Newly started node: " + grd.cluster().localNode().id());
+            info("Newly started node: " + grd.cluster().localNode().id());
 
-        for (Ignite ig : G.allGrids())
-            verifyCacheOps(canWrite, safe, part, ig);
+            // Check that partition state does not change after we start each 
node.
+            // TODO With persistence enabled LOST partitions become OWNING 
after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044.
+            if (!isPersistenceEnabled) {
+                for (Ignite ig : G.allGrids()) {
+                    verifyCacheOps(canWrite, safe, ig);
+
+                    // TODO Query effectively waits for rebalance due to 
https://issues.apache.org/jira/browse/IGNITE-10057
+                    // TODO and after resetLostPartition there is another 
OWNING copy in the cluster due to 
https://issues.apache.org/jira/browse/IGNITE-10058.
+                    // TODO Uncomment after 
https://issues.apache.org/jira/browse/IGNITE-10058 is fixed.
+//                    validateQuery(safe, ig);
+                }
+            }
+        }
 
         ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME));
 
@@ -330,24 +551,40 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
 
                 cache.put(i, i);
             }
+
+            for (int i = 0; i < parts; i++) {
+                checkQueryPasses(ig, false, i);
+
+                if (shouldExecuteLocalQuery(ig, i))
+                    checkQueryPasses(ig, true, i);
+
+            }
+
+            checkQueryPasses(ig, false);
         }
     }
 
     /**
-     *
+     * @param node Node.
+     * @param lostParts Lost partition IDs.
+     */
+    private void verifyLostPartitions(Ignite node, List<Integer> lostParts) {
+        IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME);
+
+        Set<Integer> actualSortedLostParts = new 
TreeSet<>(cache.lostPartitions());
+        Set<Integer> expSortedLostParts = new TreeSet<>(lostParts);
+
+        assertEqualsCollections(expSortedLostParts, actualSortedLostParts);
+    }
+
+    /**
      * @param canWrite {@code True} if writes are allowed.
      * @param safe {@code True} if lost partition should trigger exception.
-     * @param part Lost partition ID.
      * @param ig Ignite instance.
      */
-    private void verifyCacheOps(boolean canWrite, boolean safe, int part, 
Ignite ig) {
+    private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) {
         IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME);
 
-        Collection<Integer> lost = cache.lostPartitions();
-
-        assertTrue("Failed to find expected lost partition [exp=" + part + ", 
lost=" + lost + ']',
-            lost.contains(part));
-
         int parts = ig.affinity(CACHE_NAME).partitions();
 
         // Check read.
@@ -395,7 +632,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
      * @param nodes List of nodes to find partition.
      * @return List of partitions that aren't primary or backup for specified 
nodes.
      */
-    protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
+    private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
         Affinity<Object> aff = ignite(4).affinity(CACHE_NAME);
 
         List<Integer> parts = new ArrayList<>();
@@ -424,15 +661,125 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
      * Validate query execution on a node.
      *
      * @param safe Safe flag.
-     * @param part Partition.
      * @param node Node.
      */
-    protected void validateQuery(boolean safe, int part, Ignite node) {
+    private void validateQuery(boolean safe, Ignite node) {
+        // Get node lost and remaining partitions.
+        IgniteCache<?, ?> cache = node.cache(CACHE_NAME);
+
+        Collection<Integer> lostParts = cache.lostPartitions();
+
+        int part = 
cache.lostPartitions().stream().findFirst().orElseThrow(AssertionFailedError::new);
+
+        Integer remainingPart = null;
+
+        for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) {
+            if (lostParts.contains(i))
+                continue;
+
+            remainingPart = i;
+
+            break;
+        }
+
+        assertNotNull("Failed to find a partition that isn't lost", 
remainingPart);
+
+        // 1. Check query against all partitions.
+        validateQuery0(safe, node);
+
+        // 2. Check query against LOST partition.
+        validateQuery0(safe, node, part);
+
+        // 3. Check query on remaining partition.
+        checkQueryPasses(node, false, remainingPart);
+
+        if (shouldExecuteLocalQuery(node, remainingPart))
+            checkQueryPasses(node, true, remainingPart);
+
+        // 4. Check query over two partitions - normal and LOST.
+        validateQuery0(safe, node, part, remainingPart);
+    }
+
+    /**
+     * Query validation routine.
+     *
+     * @param safe Safe flag.
+     * @param node Node.
+     * @param parts Partitions.
+     */
+    private void validateQuery0(boolean safe, Ignite node, int... parts) {
+        if (safe)
+            checkQueryFails(node, false, parts);
+        else
+            checkQueryPasses(node, false, parts);
+
+        if (shouldExecuteLocalQuery(node, parts)) {
+            if (safe)
+                checkQueryFails(node, true, parts);
+            else
+                checkQueryPasses(node, true, parts);
+        }
+    }
+
+    /**
+     * @return true if the given node is primary for all given partitions.
+     */
+    private boolean shouldExecuteLocalQuery(Ignite node, int... parts) {
+        if (parts == null || parts.length == 0)
+            return false;
+
+        int numOfPrimaryParts = 0;
+
+        for (int nodePrimaryPart : 
node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
+            for (int part : parts) {
+                if (part == nodePrimaryPart)
+                    numOfPrimaryParts++;
+            }
+        }
+
+        return numOfPrimaryParts == parts.length;
+    }
+
+    /**
+     * @param node Node.
+     * @param loc Local flag.
+     * @param parts Partitions.
+     */
+    protected void checkQueryPasses(Ignite node, boolean loc, int... parts) {
+        // Scan queries don't support multiple partitions.
+        if (parts != null && parts.length > 1)
+            return;
+
+        // TODO Local scan queries fail in non-safe modes - 
https://issues.apache.org/jira/browse/IGNITE-10059.
+        if (loc)
+            return;
+
+        IgniteCache cache = node.cache(CACHE_NAME);
+
+        ScanQuery qry = new ScanQuery();
+
+        if (parts != null && parts.length > 0)
+            qry.setPartition(parts[0]);
+
+        if (loc)
+            qry.setLocal(true);
+
+        cache.query(qry).getAll();
+    }
+
+    /**
+     * @param node Node.
+     * @param loc Local flag.
+     * @param parts Partitions.
+     */
+    protected void checkQueryFails(Ignite node, boolean loc, int... parts) {
+        // TODO Scan queries never fail due to partition loss - 
https://issues.apache.org/jira/browse/IGNITE-9902.
+        // TODO Need to add an actual check after 
https://issues.apache.org/jira/browse/IGNITE-9902 is fixed.
         // No-op.
     }
 
     /** */
-    class TopologyChanger {
+    private class TopologyChanger {
         /** Flag to delay partition exchange */
         private boolean delayExchange;
 
@@ -451,7 +798,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
          * @param aliveNodes List of nodes to be alive.
          * @param stopDelay Delay between stopping nodes.
          */
-        public TopologyChanger(boolean delayExchange, List<Integer> killNodes, 
List<Integer> aliveNodes,
+        private TopologyChanger(boolean delayExchange, List<Integer> 
killNodes, List<Integer> aliveNodes,
             long stopDelay) {
             this.delayExchange = delayExchange;
             this.killNodes = killNodes;
@@ -463,9 +810,12 @@ public class IgniteCachePartitionLossPolicySelfTest 
extends GridCommonAbstractTe
          * @return Lost partition ID.
          * @throws Exception If failed.
          */
-        protected List<Integer> changeTopology() throws Exception {
+        private List<Integer> changeTopology() throws Exception {
             startGrids(4);
 
+            if (isPersistenceEnabled)
+                grid(0).cluster().active(true);
+
             Affinity<Object> aff = ignite(0).affinity(CACHE_NAME);
 
             for (int i = 0; i < aff.partitions(); i++)
@@ -497,7 +847,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
 
                 lostMap.add(semaphoreMap);
 
-
                 grid(i).events().localListen(new P1<Event>() {
                     @Override public boolean apply(Event evt) {
                         assert evt.type() == 
EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
@@ -512,7 +861,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
                         return true;
                     }
                 }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
-
             }
 
             if (delayExchange)
@@ -549,5 +897,4 @@ public class IgniteCachePartitionLossPolicySelfTest extends 
GridCommonAbstractTe
             return parts;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b30aa2f..87c8ce9 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -47,6 +47,7 @@ import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -112,6 +113,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
 import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
@@ -1693,6 +1696,24 @@ public class GridReduceQueryExecutor {
         Map<ClusterNode, IntArray> partsMap = null;
         Map<ClusterNode, IntArray> qryMap = null;
 
+        for (int cacheId : cacheIds) {
+            GridCacheContext<?, ?> cctx = cacheContext(cacheId);
+
+            PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+
+            if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE)
+                continue;
+
+            Collection<Integer> lostParts = cctx.topology().lostPartitions();
+
+            for (int part : lostParts) {
+                if (parts == null || Arrays.binarySearch(parts, part) >= 0) {
+                    throw new CacheException("Failed to execute query because 
cache partition has been " +
+                        "lost [cacheName=" + cctx.name() + ", part=" + part + 
']');
+                }
+            }
+        }
+
         if (isPreloadingActive(cacheIds)) {
             if (isReplicatedOnly)
                 nodes = replicatedUnstableDataNodes(cacheIds, qryId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
index f208599..a31a1c6 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java
@@ -23,8 +23,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest;
 
-import java.util.Collection;
-
 /**
  * Partition loss policy test with enabled indexing.
  */
@@ -39,80 +37,27 @@ public class IndexingCachePartitionLossPolicySelfTest 
extends IgniteCachePartiti
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected void validateQuery(boolean safe, int part, Ignite 
node) {
-        // Get node lost and remaining partitions.
-        IgniteCache cache = node.cache(CACHE_NAME);
-
-        Collection<Integer> lostParts = cache.lostPartitions();
-
-        Integer remainingPart = null;
-
-        for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) {
-            if (lostParts.contains(i))
-                continue;
-
-            remainingPart = i;
-
-            break;
-        }
-
-        // Determine whether local query should be executed on that node.
-        boolean execLocQry = false;
-
-        for (int nodePrimaryPart : 
node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) {
-            if (part == nodePrimaryPart) {
-                execLocQry = true;
-
-                break;
-            }
-        }
-
-        // 1. Check query against all partitions.
-        validateQuery0(safe, node, false);
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-7039
-//        if (execLocQry)
-//            validateQuery0(safe, node, true);
-
-        // 2. Check query against LOST partition.
-        validateQuery0(safe, node, false, part);
+    protected void checkQueryPasses(Ignite node, boolean loc, int... parts) {
+        executeQuery(node, loc, parts);
+    }
 
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-7039
-//        if (execLocQry)
-//            validateQuery0(safe, node, true, part);
+    /** {@inheritDoc} */
+    protected void checkQueryFails(Ignite node, boolean loc, int... parts) {
+        // TODO: Local queries ignore partition loss, see 
https://issues.apache.org/jira/browse/IGNITE-7039.
+        if (loc)
+            return;
 
-        // 3. Check query on remaining partition.
-        if (remainingPart != null) {
-            executeQuery(node, false, remainingPart);
+        try {
+            executeQuery(node, loc, parts);
 
-            // 4. Check query over two partitions - normal and LOST.
-            validateQuery0(safe, node, false, part, remainingPart);
+            fail("Exception is not thrown.");
         }
-    }
-
-    /**
-     * Query validation routine.
-     *
-     * @param safe Safe flag.
-     * @param node Node.
-     * @param loc Local flag.
-     * @param parts Partitions.
-     */
-    private void validateQuery0(boolean safe, Ignite node, boolean loc, int... 
parts) {
-        if (safe) {
-            try {
-                executeQuery(node, loc, parts);
+        catch (Exception e) {
+            boolean exp = e.getMessage() != null &&
+                e.getMessage().contains("Failed to execute query because cache 
partition has been lost");
 
-                fail("Exception is not thrown.");
-            }
-            catch (Exception e) {
-                assertTrue(e.getMessage(), e.getMessage() != null &&
-                    e.getMessage().contains("Failed to execute query because 
cache partition has been lost"));
-            }
-        }
-        else {
-            executeQuery(node, loc, parts);
+            if (!exp)
+                throw e;
         }
     }
 

Reply via email to