Repository: ignite Updated Branches: refs/heads/master de5e577c4 -> 5cf44e8e3
IGNITE-9841: SQL: Improved tracking of LOST partitions for queries when persistence is enabled. This closes #5069. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5cf44e8e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5cf44e8e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5cf44e8e Branch: refs/heads/master Commit: 5cf44e8e3af75dcbf45e2ad2b230550b27e87923 Parents: de5e577 Author: Stanislav Lukyanov <stanlukya...@gmail.com> Authored: Fri Nov 2 11:49:00 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Fri Nov 2 11:49:00 2018 +0300 ---------------------------------------------------------------------- .../IgniteCachePartitionLossPolicySelfTest.java | 435 +++++++++++++++++-- .../h2/twostep/GridReduceQueryExecutor.java | 21 + ...ndexingCachePartitionLossPolicySelfTest.java | 87 +--- 3 files changed, 428 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java index 1616e8f..caf0829 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java @@ -24,17 +24,23 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; +import junit.framework.AssertionFailedError; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; @@ -81,7 +87,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe private final AtomicBoolean delayPartExchange = new AtomicBoolean(false); /** */ - private final TopologyChanger killSingleNode = new TopologyChanger(false, Arrays.asList(3), Arrays.asList(0, 1, 2, 4),0); + private final TopologyChanger killSingleNode = new TopologyChanger(false, Collections.singletonList(3), Arrays.asList(0, 1, 2, 4), 0); + + /** */ + private boolean isPersistenceEnabled; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -103,6 +112,10 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe cfg.setCacheConfiguration(cacheConfiguration()); + cfg.setDataStorageConfiguration(new DataStorageConfiguration().setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(isPersistenceEnabled) + )); + return cfg; } @@ -122,17 +135,25 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); delayPartExchange.set(false); + partLossPlc = PartitionLossPolicy.IGNORE; + backups = 0; + + isPersistenceEnabled = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); } /** @@ -147,6 +168,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadOnlySafeWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE; + + isPersistenceEnabled = true; + + checkLostPartition(false, true, killSingleNode); + } + + /** + * @throws Exception if failed. + */ public void testReadOnlyAll() throws Exception { partLossPlc = PartitionLossPolicy.READ_ONLY_ALL; @@ -156,6 +188,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadOnlyAllWithPersistence() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10041"); + + partLossPlc = PartitionLossPolicy.READ_ONLY_ALL; + + isPersistenceEnabled = true; + + checkLostPartition(false, false, killSingleNode); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafe() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -165,6 +210,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, killSingleNode); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteAll() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_ALL; @@ -174,6 +230,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteAllWithPersistence() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10041"); + + partLossPlc = PartitionLossPolicy.READ_WRITE_ALL; + + isPersistenceEnabled = true; + + checkLostPartition(true, false, killSingleNode); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeAfterKillTwoNodes() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -183,6 +252,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -192,6 +272,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(false, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 20)); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeWithBackupsAfterKillThreeNodes() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -203,6 +294,21 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10043"); + + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + backups = 1; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2, 1), Arrays.asList(0, 4), 0)); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeAfterKillCrd() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -212,6 +318,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeWithBackups() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -223,6 +340,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe /** * @throws Exception if failed. */ + public void testReadWriteSafeWithBackupsWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + backups = 1; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 2), Arrays.asList(0, 1, 4), 0)); + } + + /** + * @throws Exception if failed. + */ public void testReadWriteSafeWithBackupsAfterKillCrd() throws Exception { partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; @@ -232,12 +362,81 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe } /** - * @param topChanger topology changer. * @throws Exception if failed. */ - public void testIgnore(TopologyChanger topChanger) throws Exception { + public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception { + partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE; + + backups = 1; + + isPersistenceEnabled = true; + + checkLostPartition(true, true, new TopologyChanger(true, Arrays.asList(3, 0), Arrays.asList(1, 2, 4), 0)); + } + + /** + * @throws Exception if failed. + */ + public void testIgnore() throws Exception { fail("https://issues.apache.org/jira/browse/IGNITE-5078"); + partLossPlc = PartitionLossPolicy.IGNORE; + + checkIgnore(killSingleNode); + } + + /** + * @throws Exception if failed. + */ + public void testIgnoreWithPersistence() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-5078"); + + fail("https://issues.apache.org/jira/browse/IGNITE-10041"); + + partLossPlc = PartitionLossPolicy.IGNORE; + + isPersistenceEnabled = true; + + checkIgnore(killSingleNode); + } + + /** + * @throws Exception if failed. + */ + public void testIgnoreKillThreeNodes() throws Exception { + partLossPlc = PartitionLossPolicy.IGNORE; + + // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078. + // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed. + // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0); + TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0); + + checkIgnore(onlyCrdIsAlive); + } + + /** + * @throws Exception if failed. + */ + public void testIgnoreKillThreeNodesWithPersistence() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-10041"); + + partLossPlc = PartitionLossPolicy.IGNORE; + + isPersistenceEnabled = true; + + // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078. + // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed. + // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0); + TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Collections.singletonList(0), 0); + + checkIgnore(onlyCrdIsAlive); + } + + /** + * @param topChanger topology changer. + * @throws Exception if failed. + */ + private void checkIgnore(TopologyChanger topChanger) throws Exception { topChanger.changeTopology(); for (Ignite ig : G.allGrids()) { @@ -266,14 +465,14 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception { assert partLossPlc != null; - int part = topChanger.changeTopology().get(0); + List<Integer> lostParts = topChanger.changeTopology(); // Wait for all grids (servers and client) have same topology version // to make sure that all nodes received map with lost partition. - GridTestUtils.waitForCondition(() -> { + boolean success = GridTestUtils.waitForCondition(() -> { AffinityTopologyVersion last = null; for (Ignite ig : G.allGrids()) { - AffinityTopologyVersion ver = ((IgniteEx) ig).context().cache().context().exchange().readyAffinityVersion(); + AffinityTopologyVersion ver = ((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion(); if (last != null && !last.equals(ver)) return false; @@ -284,35 +483,57 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe return true; }, 10000); + assertTrue("Failed to wait for new topology", success); + for (Ignite ig : G.allGrids()) { info("Checking node: " + ig.cluster().localNode().id()); IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); - verifyCacheOps(canWrite, safe, part, ig); + verifyLostPartitions(ig, lostParts); - // Check we can read and write to lost partition in recovery mode. - IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover(); + verifyCacheOps(canWrite, safe, ig); - for (int lostPart : recoverCache.lostPartitions()) { - recoverCache.get(lostPart); - recoverCache.put(lostPart, lostPart); - } + validateQuery(safe, ig); - // Check that writing in recover mode does not clear partition state. - verifyCacheOps(canWrite, safe, part, ig); + // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041. + if (!isPersistenceEnabled) { + // Check we can read and write to lost partition in recovery mode. + IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover(); + + for (int lostPart : recoverCache.lostPartitions()) { + recoverCache.get(lostPart); + recoverCache.put(lostPart, lostPart); + } - // Validate queries. - validateQuery(safe, part, ig); + // Check that writing in recover mode does not clear partition state. + verifyLostPartitions(ig, lostParts); + + verifyCacheOps(canWrite, safe, ig); + + validateQuery(safe, ig); + } } - // Check that partition state does not change after we start a new node. - IgniteEx grd = startGrid(3); + // Bring all nodes back. + for (int i : topChanger.killNodes) { + IgniteEx grd = startGrid(i); - info("Newly started node: " + grd.cluster().localNode().id()); + info("Newly started node: " + grd.cluster().localNode().id()); - for (Ignite ig : G.allGrids()) - verifyCacheOps(canWrite, safe, part, ig); + // Check that partition state does not change after we start each node. + // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044. + if (!isPersistenceEnabled) { + for (Ignite ig : G.allGrids()) { + verifyCacheOps(canWrite, safe, ig); + + // TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057 + // TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058. + // TODO Uncomment after https://issues.apache.org/jira/browse/IGNITE-10058 is fixed. +// validateQuery(safe, ig); + } + } + } ignite(4).resetLostPartitions(Collections.singletonList(CACHE_NAME)); @@ -330,24 +551,40 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe cache.put(i, i); } + + for (int i = 0; i < parts; i++) { + checkQueryPasses(ig, false, i); + + if (shouldExecuteLocalQuery(ig, i)) + checkQueryPasses(ig, true, i); + + } + + checkQueryPasses(ig, false); } } /** - * + * @param node Node. + * @param lostParts Lost partition IDs. + */ + private void verifyLostPartitions(Ignite node, List<Integer> lostParts) { + IgniteCache<Integer, Integer> cache = node.cache(CACHE_NAME); + + Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions()); + Set<Integer> expSortedLostParts = new TreeSet<>(lostParts); + + assertEqualsCollections(expSortedLostParts, actualSortedLostParts); + } + + /** * @param canWrite {@code True} if writes are allowed. * @param safe {@code True} if lost partition should trigger exception. - * @param part Lost partition ID. * @param ig Ignite instance. */ - private void verifyCacheOps(boolean canWrite, boolean safe, int part, Ignite ig) { + private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) { IgniteCache<Integer, Integer> cache = ig.cache(CACHE_NAME); - Collection<Integer> lost = cache.lostPartitions(); - - assertTrue("Failed to find expected lost partition [exp=" + part + ", lost=" + lost + ']', - lost.contains(part)); - int parts = ig.affinity(CACHE_NAME).partitions(); // Check read. @@ -395,7 +632,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * @param nodes List of nodes to find partition. * @return List of partitions that aren't primary or backup for specified nodes. */ - protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) { + private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) { Affinity<Object> aff = ignite(4).affinity(CACHE_NAME); List<Integer> parts = new ArrayList<>(); @@ -424,15 +661,125 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * Validate query execution on a node. * * @param safe Safe flag. - * @param part Partition. * @param node Node. */ - protected void validateQuery(boolean safe, int part, Ignite node) { + private void validateQuery(boolean safe, Ignite node) { + // Get node lost and remaining partitions. + IgniteCache<?, ?> cache = node.cache(CACHE_NAME); + + Collection<Integer> lostParts = cache.lostPartitions(); + + int part = cache.lostPartitions().stream().findFirst().orElseThrow(AssertionFailedError::new); + + Integer remainingPart = null; + + for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) { + if (lostParts.contains(i)) + continue; + + remainingPart = i; + + break; + } + + assertNotNull("Failed to find a partition that isn't lost", remainingPart); + + // 1. Check query against all partitions. + validateQuery0(safe, node); + + // 2. Check query against LOST partition. + validateQuery0(safe, node, part); + + // 3. Check query on remaining partition. + checkQueryPasses(node, false, remainingPart); + + if (shouldExecuteLocalQuery(node, remainingPart)) + checkQueryPasses(node, true, remainingPart); + + // 4. Check query over two partitions - normal and LOST. + validateQuery0(safe, node, part, remainingPart); + } + + /** + * Query validation routine. + * + * @param safe Safe flag. + * @param node Node. + * @param parts Partitions. + */ + private void validateQuery0(boolean safe, Ignite node, int... parts) { + if (safe) + checkQueryFails(node, false, parts); + else + checkQueryPasses(node, false, parts); + + if (shouldExecuteLocalQuery(node, parts)) { + if (safe) + checkQueryFails(node, true, parts); + else + checkQueryPasses(node, true, parts); + } + } + + /** + * @return true if the given node is primary for all given partitions. + */ + private boolean shouldExecuteLocalQuery(Ignite node, int... parts) { + if (parts == null || parts.length == 0) + return false; + + int numOfPrimaryParts = 0; + + for (int nodePrimaryPart : node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) { + for (int part : parts) { + if (part == nodePrimaryPart) + numOfPrimaryParts++; + } + } + + return numOfPrimaryParts == parts.length; + } + + /** + * @param node Node. + * @param loc Local flag. + * @param parts Partitions. + */ + protected void checkQueryPasses(Ignite node, boolean loc, int... parts) { + // Scan queries don't support multiple partitions. + if (parts != null && parts.length > 1) + return; + + // TODO Local scan queries fail in non-safe modes - https://issues.apache.org/jira/browse/IGNITE-10059. + if (loc) + return; + + IgniteCache cache = node.cache(CACHE_NAME); + + ScanQuery qry = new ScanQuery(); + + if (parts != null && parts.length > 0) + qry.setPartition(parts[0]); + + if (loc) + qry.setLocal(true); + + cache.query(qry).getAll(); + } + + /** + * @param node Node. + * @param loc Local flag. + * @param parts Partitions. + */ + protected void checkQueryFails(Ignite node, boolean loc, int... parts) { + // TODO Scan queries never fail due to partition loss - https://issues.apache.org/jira/browse/IGNITE-9902. + // TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed. // No-op. } /** */ - class TopologyChanger { + private class TopologyChanger { /** Flag to delay partition exchange */ private boolean delayExchange; @@ -451,7 +798,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * @param aliveNodes List of nodes to be alive. * @param stopDelay Delay between stopping nodes. */ - public TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes, + private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes, long stopDelay) { this.delayExchange = delayExchange; this.killNodes = killNodes; @@ -463,9 +810,12 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe * @return Lost partition ID. * @throws Exception If failed. */ - protected List<Integer> changeTopology() throws Exception { + private List<Integer> changeTopology() throws Exception { startGrids(4); + if (isPersistenceEnabled) + grid(0).cluster().active(true); + Affinity<Object> aff = ignite(0).affinity(CACHE_NAME); for (int i = 0; i < aff.partitions(); i++) @@ -497,7 +847,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe lostMap.add(semaphoreMap); - grid(i).events().localListen(new P1<Event>() { @Override public boolean apply(Event evt) { assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; @@ -512,7 +861,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe return true; } }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); - } if (delayExchange) @@ -549,5 +897,4 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe return parts; } } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index b30aa2f..87c8ce9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -47,6 +47,7 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -112,6 +113,8 @@ import org.jetbrains.annotations.Nullable; import static java.util.Collections.singletonList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; +import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; @@ -1693,6 +1696,24 @@ public class GridReduceQueryExecutor { Map<ClusterNode, IntArray> partsMap = null; Map<ClusterNode, IntArray> qryMap = null; + for (int cacheId : cacheIds) { + GridCacheContext<?, ?> cctx = cacheContext(cacheId); + + PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); + + if (plc != READ_ONLY_SAFE && plc != READ_WRITE_SAFE) + continue; + + Collection<Integer> lostParts = cctx.topology().lostPartitions(); + + for (int part : lostParts) { + if (parts == null || Arrays.binarySearch(parts, part) >= 0) { + throw new CacheException("Failed to execute query because cache partition has been " + + "lost [cacheName=" + cctx.name() + ", part=" + part + ']'); + } + } + } + if (isPreloadingActive(cacheIds)) { if (isReplicatedOnly) nodes = replicatedUnstableDataNodes(cacheIds, qryId); http://git-wip-us.apache.org/repos/asf/ignite/blob/5cf44e8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java index f208599..a31a1c6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java @@ -23,8 +23,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest; -import java.util.Collection; - /** * Partition loss policy test with enabled indexing. */ @@ -39,80 +37,27 @@ public class IndexingCachePartitionLossPolicySelfTest extends IgniteCachePartiti } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected void validateQuery(boolean safe, int part, Ignite node) { - // Get node lost and remaining partitions. - IgniteCache cache = node.cache(CACHE_NAME); - - Collection<Integer> lostParts = cache.lostPartitions(); - - Integer remainingPart = null; - - for (int i = 0; i < node.affinity(CACHE_NAME).partitions(); i++) { - if (lostParts.contains(i)) - continue; - - remainingPart = i; - - break; - } - - // Determine whether local query should be executed on that node. - boolean execLocQry = false; - - for (int nodePrimaryPart : node.affinity(CACHE_NAME).primaryPartitions(node.cluster().localNode())) { - if (part == nodePrimaryPart) { - execLocQry = true; - - break; - } - } - - // 1. Check query against all partitions. - validateQuery0(safe, node, false); - - // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 -// if (execLocQry) -// validateQuery0(safe, node, true); - - // 2. Check query against LOST partition. - validateQuery0(safe, node, false, part); + protected void checkQueryPasses(Ignite node, boolean loc, int... parts) { + executeQuery(node, loc, parts); + } - // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 -// if (execLocQry) -// validateQuery0(safe, node, true, part); + /** {@inheritDoc} */ + protected void checkQueryFails(Ignite node, boolean loc, int... parts) { + // TODO: Local queries ignore partition loss, see https://issues.apache.org/jira/browse/IGNITE-7039. + if (loc) + return; - // 3. Check query on remaining partition. - if (remainingPart != null) { - executeQuery(node, false, remainingPart); + try { + executeQuery(node, loc, parts); - // 4. Check query over two partitions - normal and LOST. - validateQuery0(safe, node, false, part, remainingPart); + fail("Exception is not thrown."); } - } - - /** - * Query validation routine. - * - * @param safe Safe flag. - * @param node Node. - * @param loc Local flag. - * @param parts Partitions. - */ - private void validateQuery0(boolean safe, Ignite node, boolean loc, int... parts) { - if (safe) { - try { - executeQuery(node, loc, parts); + catch (Exception e) { + boolean exp = e.getMessage() != null && + e.getMessage().contains("Failed to execute query because cache partition has been lost"); - fail("Exception is not thrown."); - } - catch (Exception e) { - assertTrue(e.getMessage(), e.getMessage() != null && - e.getMessage().contains("Failed to execute query because cache partition has been lost")); - } - } - else { - executeQuery(node, loc, parts); + if (!exp) + throw e; } }