GEODE-1985: Check for index expression reevalaution using a time window Changing the logic for how to we check to see if an entry may have been concurrently modified while an indexed query is in progress.
The new logic just has a time window, defaulting to 10 minutes. If the entry was changed less than 10 minutes for the query started, we will reevaluate the index expression to make sure the entry is still valid. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/d6afb70d Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/d6afb70d Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/d6afb70d Branch: refs/heads/feature/GEODE-1985 Commit: d6afb70d474e54a63523b3d3245a67e18f547c15 Parents: 24a7204 Author: Dan Smith <[email protected]> Authored: Mon Oct 10 17:41:07 2016 -0700 Committer: Dan Smith <[email protected]> Committed: Mon Oct 24 11:11:15 2016 -0700 ---------------------------------------------------------------------- .../query/internal/index/IndexManager.java | 60 ++----------- .../geode/internal/cache/GemFireCacheImpl.java | 2 +- .../geode/internal/cache/LocalRegion.java | 1 - .../query/internal/IndexManagerJUnitTest.java | 60 +------------ ...AbstractIndexMaintenanceIntegrationTest.java | 92 +++++++++++++++++++- ...aintenanceNoReevaluationIntegrationTest.java | 42 +++++++++ .../internal/index/RangeIndexAPIJUnitTest.java | 6 +- .../cache/PartitionedRegionQueryDUnitTest.java | 37 ++------ 8 files changed, 152 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java index 27f239e..7a05e7c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java @@ -108,12 +108,14 @@ public class IndexManager { public static boolean TEST_RANGEINDEX_ONLY = false; public static final String INDEX_ELEMARRAY_THRESHOLD_PROP = "index_elemarray_threshold"; public static final String INDEX_ELEMARRAY_SIZE_PROP = "index_elemarray_size"; + public static final String IN_PROGRESS_UPDATE_WINDOW_PROP = "index.IN_PROGRESS_UPDATE_WINDOW_MS"; public static final int INDEX_ELEMARRAY_THRESHOLD = Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_THRESHOLD_PROP, "100")); public static final int INDEX_ELEMARRAY_SIZE = Integer.parseInt(System.getProperty(INDEX_ELEMARRAY_SIZE_PROP, "5")); - public final static AtomicLong SAFE_QUERY_TIME = new AtomicLong(0); - public static boolean ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION = true; + + public static long IN_PROGRESS_UPDATE_WINDOW = + Long.getLong(IN_PROGRESS_UPDATE_WINDOW_PROP, 10 * 60 * 1000); /** The NULL constant */ public static final Object NULL = new NullToken(); @@ -143,36 +145,9 @@ public class IndexManager { } /** - * Stores the largest combination of current time + delta If there is a large delta/hiccup in - * timings, this allows us to calculate the correct results for a query but, reevaluate more - * aggressively. But the large hiccup will eventually be rolled off as time is always increasing - * This is a fix for #47475 - * - * @param operationTime the last modified time from version tag - * @param currentCacheTime - */ - public static boolean setIndexBufferTime(long operationTime, long currentCacheTime) { - long timeDifference = currentCacheTime - operationTime; - return setNewLargestValue(SAFE_QUERY_TIME, currentCacheTime + timeDifference); - } - - /** - * only for test purposes This should not be called from any product code. Calls from product code - * will possibly cause continous reevaluation (performance issue) OR incorrect query results - * (functional issue) - **/ - public static void resetIndexBufferTime() { - SAFE_QUERY_TIME.set(0); - } - - /** - * Calculates whether we need to reevluate the key for the region entry We added a way to - * determine whether to reevaluate an entry for query execution The method is to keep track of the - * delta and current time in a single long value The value is then used by the query to determine - * if a region entry needs to be reevaluated, based on subtracting the value with the query - * execution time. This provides a delta + some false positive time (dts) If the dts + last - * modified time of the region entry is > query start time, we can assume that it needs to be - * reevaluated + * Calculates whether we need to reevaluate the key for the region entry We added a way to + * determine whether to reevaluate an entry for query execution. If the start time of the query - + * the last modified time of the entr * * This is to fix bug 47475, where references to region entries can be held by the executing query * either directly or indirectly (iterators can hold references for next) but the values @@ -184,26 +159,7 @@ public class IndexManager { * @param lastModifiedTime */ public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) { - return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION - && queryStartTime <= SAFE_QUERY_TIME.get() - queryStartTime + lastModifiedTime; - } - - /** - * - * @param value - * @param newValue - */ - private static boolean setNewLargestValue(AtomicLong value, long newValue) { - boolean done = false; - while (!done) { - long oldValue = value.get(); - if (oldValue < newValue) { - return value.compareAndSet(oldValue, newValue); - } else { - done = true; - } - } - return false; + return IN_PROGRESS_UPDATE_WINDOW >= queryStartTime - lastModifiedTime; } /** Test Hook */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java index d9d572c..3b04d9a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java @@ -383,7 +383,7 @@ public class GemFireCacheImpl * retrieval operations. It is assumed that the traversal operations on cache servers list vastly * outnumber the mutative operations such as add, remove. */ - private volatile List allCacheServers = new CopyOnWriteArrayList(); + private final List allCacheServers = new CopyOnWriteArrayList(); /** * Controls updates to the list of all gateway senders http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 360c6a9..e40544d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -7369,7 +7369,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, lastModified = cacheTimeMillis(); } entry.updateStatsForPut(lastModified); - IndexManager.setIndexBufferTime(lastModified, cacheTimeMillis()); if (this.statisticsEnabled && !isProxy()) { // do not reschedule if there is already a task in the queue. // this prevents bloat in the TimerTask since cancelled tasks http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/test/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java index 929bd46..852b1a3 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java @@ -40,6 +40,7 @@ import org.apache.geode.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) public class IndexManagerJUnitTest { + public long originalUpdateWindow; @Before public void setUp() throws java.lang.Exception { @@ -49,12 +50,14 @@ public class IndexManagerJUnitTest { region.put("" + i, new Portfolio(i)); // CacheUtils.log(new Portfolio(i)); } + originalUpdateWindow = IndexManager.IN_PROGRESS_UPDATE_WINDOW; } @After public void tearDown() throws java.lang.Exception { CacheUtils.closeCache(); + IndexManager.IN_PROGRESS_UPDATE_WINDOW = originalUpdateWindow; } /** @@ -67,67 +70,12 @@ public class IndexManagerJUnitTest { */ @Test public void testSafeQueryTime() { - IndexManager.resetIndexBufferTime(); - // fake entry update at LMT of 0 and actual time of 10 - // safe query time set in index manager is going to be 20 - assertTrue(IndexManager.setIndexBufferTime(0, 10)); + IndexManager.IN_PROGRESS_UPDATE_WINDOW = 10; // fake query start at actual time of 9, 10, 11 and using the fake LMT of 0 assertTrue(IndexManager.needsRecalculation(9, 0)); assertTrue(IndexManager.needsRecalculation(10, 0)); assertFalse(IndexManager.needsRecalculation(11, 0)); - - assertFalse(IndexManager.needsRecalculation(9, -3)); // old enough updates shouldn't trigger a - // recalc - assertTrue(IndexManager.needsRecalculation(9, -2)); // older updates but falls within the delta - // (false positive) - - assertTrue(IndexManager.needsRecalculation(10, 5)); - // This should eval to true only because of false positives. - assertTrue(IndexManager.needsRecalculation(11, 5)); - - // Now let's assume a new update has occurred, this update delta and time combo still is not - // larger - assertFalse(IndexManager.setIndexBufferTime(0, 9)); - assertFalse(IndexManager.setIndexBufferTime(1, 10)); - - // Now let's assume a new update has occured where the time is larger (enough to roll off the - // large delta) - // but the delta is smaller - assertTrue(IndexManager.setIndexBufferTime(30, 30)); - - // Now that we have a small delta, let's see if a query that was "stuck" would reevaluate - // appropriately - assertTrue(IndexManager.needsRecalculation(9, 0)); - } - - // Let's test for negative delta's or a system that is slower than others in the cluster - @Test - public void testSafeQueryTimeForASlowNode() { - IndexManager.resetIndexBufferTime(); - // fake entry update at LMT of 0 and actual time of 10 - // safe query time set in index manager is going to be -10 - assertTrue(IndexManager.setIndexBufferTime(210, 200)); - - assertFalse(IndexManager.needsRecalculation(200, 190)); - assertFalse(IndexManager.needsRecalculation(200, 200)); - assertTrue(IndexManager.needsRecalculation(200, 210)); - - assertTrue(IndexManager.needsRecalculation(200, 220)); - assertTrue(IndexManager.needsRecalculation(200, 221)); - - // now lets say an entry updates with no delta - assertTrue(IndexManager.setIndexBufferTime(210, 210)); - - assertTrue(IndexManager.needsRecalculation(200, 190)); - assertTrue(IndexManager.needsRecalculation(200, 200)); - assertTrue(IndexManager.needsRecalculation(200, 210)); - - assertTrue(IndexManager.needsRecalculation(200, 220)); - assertTrue(IndexManager.needsRecalculation(200, 221)); - - assertTrue(IndexManager.needsRecalculation(210, 211)); - assertFalse(IndexManager.needsRecalculation(212, 210)); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/AbstractIndexMaintenanceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/AbstractIndexMaintenanceIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/AbstractIndexMaintenanceIntegrationTest.java index 66ed117..68f3392 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/AbstractIndexMaintenanceIntegrationTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/AbstractIndexMaintenanceIntegrationTest.java @@ -14,7 +14,21 @@ */ package org.apache.geode.cache.query.internal.index; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import org.apache.geode.cache.query.FunctionDomainException; +import org.apache.geode.cache.query.NameResolutionException; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryInvocationTargetException; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.TypeMismatchException; +import org.apache.geode.cache.query.internal.QueryObserver; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -33,6 +47,16 @@ import org.apache.geode.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) public abstract class AbstractIndexMaintenanceIntegrationTest { + private Cache cache; + private QueryService queryService; + + @Before + public void setUp() throws Exception { + CacheUtils.startCache(); + cache = CacheUtils.getCache(); + queryService = cache.getQueryService(); + } + @After public void tearDown() throws Exception { @@ -42,13 +66,10 @@ public abstract class AbstractIndexMaintenanceIntegrationTest { @Test public void whenRemovingRegionEntryFromIndexIfEntryDestroyedIsThrownCorrectlyRemoveFromIndexAndNotThrowException() throws Exception { - CacheUtils.startCache(); - Cache cache = CacheUtils.getCache(); LocalRegion region = (LocalRegion) cache.createRegionFactory(RegionShortcut.REPLICATE).create("portfolios"); - QueryService qs = cache.getQueryService(); AbstractIndex statusIndex = - createIndex(qs, "statusIndex", "value.status", "/portfolios.entrySet()"); + createIndex(queryService, "statusIndex", "value.status", "/portfolios.entrySet()"); PortfolioPdx p = new PortfolioPdx(1); region.put("KEY-1", p); @@ -58,6 +79,69 @@ public abstract class AbstractIndexMaintenanceIntegrationTest { statusIndex.removeIndexMapping(entry, IndexProtocol.OTHER_OP); } + @Test + public void queryReturnsEntryThatIsInIndex() throws Exception { + LocalRegion region = + (LocalRegion) cache.createRegionFactory(RegionShortcut.REPLICATE).create("portfolios"); + AbstractIndex statusIndex = createIndex(queryService, "statusIndex", "status", "/portfolios"); + + PortfolioPdx portfolio0 = new PortfolioPdx(0); + PortfolioPdx portfolio1 = new PortfolioPdx(1); + region.put(0, portfolio0); + region.put(1, portfolio1); + executeQuery("select * from /portfolios where status='active'", portfolio0); + assertEquals(1, statusIndex.getStatistics().getTotalUses()); + } + + @Test + public void queryDoesNotReturnDestroyedEntry() throws Exception { + LocalRegion region = + (LocalRegion) cache.createRegionFactory(RegionShortcut.REPLICATE).create("portfolios"); + AbstractIndex statusIndex = createIndex(queryService, "statusIndex", "status", "/portfolios"); + + PortfolioPdx portfolio0 = new PortfolioPdx(0); + region.put(0, portfolio0); + region.destroy(0); + executeQuery("select * from /portfolios where status='active'"); + assertEquals(1, statusIndex.getStatistics().getTotalUses()); + } + + @Test + public void queryDoesNotReturnUpdatedEntryThatNoLongerMatchesExpression() throws Exception { + LocalRegion region = + (LocalRegion) cache.createRegionFactory(RegionShortcut.REPLICATE).create("portfolios"); + AbstractIndex statusIndex = createIndex(queryService, "statusIndex", "status", "/portfolios"); + + PortfolioPdx portfolio0 = new PortfolioPdx(0); + region.put(0, portfolio0); + region.put(0, new PortfolioPdx(1)); + executeQuery("select * from /portfolios where status='active'"); + assertEquals(1, statusIndex.getStatistics().getTotalUses()); + } + + @Test + public void queryReturnsUpdatedEntryThatNowMatchesExpression() throws Exception { + LocalRegion region = + (LocalRegion) cache.createRegionFactory(RegionShortcut.REPLICATE).create("portfolios"); + AbstractIndex statusIndex = createIndex(queryService, "statusIndex", "status", "/portfolios"); + + PortfolioPdx portfolio0 = new PortfolioPdx(0); + region.put(0, new PortfolioPdx(1)); + region.put(0, portfolio0); + assertEquals(0, statusIndex.getStatistics().getTotalUses()); + executeQuery("select * from /portfolios where status='active'", portfolio0); + assertEquals(1, statusIndex.getStatistics().getTotalUses()); + } + + private void executeQuery(String queryString, final Object... expected) + throws FunctionDomainException, TypeMismatchException, NameResolutionException, + QueryInvocationTargetException { + Query query = queryService.newQuery(queryString); + SelectResults results = (SelectResults) query.execute(); + + assertEquals(new HashSet(Arrays.asList(expected)), results.asSet()); + } + protected abstract AbstractIndex createIndex(final QueryService qs, String name, String indexExpression, String regionPath) throws IndexNameConflictException, IndexExistsException, RegionNotFoundException; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexMaintenanceNoReevaluationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexMaintenanceNoReevaluationIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexMaintenanceNoReevaluationIntegrationTest.java new file mode 100644 index 0000000..423c8ba --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/CompactRangeIndexMaintenanceNoReevaluationIntegrationTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.internal.index; + +import org.junit.After; +import org.junit.Before; + +/** + * Runs all the tests of {@link CompactRangeIndexMaintenanceIntegrationTest}, but with reevaluation + * of the query expression after getting an entry from the index is disabled. This ensures we are + * actually removing the entries from the index. + */ +public class CompactRangeIndexMaintenanceNoReevaluationIntegrationTest + extends CompactRangeIndexMaintenanceIntegrationTest { + private long originalWindow; + + @Before + public void disableReevaluation() { + originalWindow = IndexManager.IN_PROGRESS_UPDATE_WINDOW; + IndexManager.IN_PROGRESS_UPDATE_WINDOW = Long.MIN_VALUE; + + } + + @After + public void enableReevaluation() { + IndexManager.IN_PROGRESS_UPDATE_WINDOW = originalWindow; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/RangeIndexAPIJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/RangeIndexAPIJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/RangeIndexAPIJUnitTest.java index f52f3aa..fb12c0f 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/RangeIndexAPIJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/RangeIndexAPIJUnitTest.java @@ -58,11 +58,13 @@ import org.apache.geode.test.junit.categories.IntegrationTest; @Category(IntegrationTest.class) public class RangeIndexAPIJUnitTest { private Region region = null; + private long originalWindow; @Before public void setUp() throws java.lang.Exception { CacheUtils.startCache(); - IndexManager.ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION = false; + originalWindow = IndexManager.IN_PROGRESS_UPDATE_WINDOW; + IndexManager.IN_PROGRESS_UPDATE_WINDOW = Long.MIN_VALUE; region = CacheUtils.createRegion("portfolios", Portfolio.class); for (int i = 0; i < 12; i++) { // CacheUtils.log(new Portfolio(i)); @@ -78,7 +80,7 @@ public class RangeIndexAPIJUnitTest { @After public void tearDown() throws java.lang.Exception { - IndexManager.ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION = true; + IndexManager.IN_PROGRESS_UPDATE_WINDOW = originalWindow; CacheUtils.closeCache(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d6afb70d/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java index 0196542..b3cbf1f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryDUnitTest.java @@ -112,40 +112,13 @@ public class PartitionedRegionQueryDUnitTest extends JUnit4CacheTestCase { } }); - vm0.invoke(new SerializableRunnable("resetting sqt") { - public void run() { - IndexManager.setIndexBufferTime(Long.MAX_VALUE, Long.MAX_VALUE); - } - }); - - vm1.invoke(new SerializableRunnable("resetting sqt") { - public void run() { - IndexManager.setIndexBufferTime(Long.MAX_VALUE, Long.MAX_VALUE); - } - }); - - vm0.invoke(new SerializableRunnable("query") { - public void run() { - try { - QueryService qs = getCache().getQueryService(); - qs.newQuery( - "SELECT DISTINCT entry.key, entry.value FROM /region.entrySet entry WHERE entry.value.score >= 5 AND entry.value.score <= 10 ORDER BY value asc") - .execute(); - } catch (QueryInvocationTargetException e) { - e.printStackTrace(); - fail(e.toString()); - } catch (NameResolutionException e) { - fail(e.toString()); - } catch (TypeMismatchException e) { - fail(e.toString()); - } catch (FunctionDomainException e) { - fail(e.toString()); - - } - - } + vm0.invoke(() -> { + QueryService qs = getCache().getQueryService(); + qs.newQuery( + "SELECT DISTINCT entry.key, entry.value FROM /region.entrySet entry WHERE entry.value.score >= 5 AND entry.value.score <= 10 ORDER BY value asc") + .execute(); }); }
