This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new a1f747c GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338) a1f747c is described below commit a1f747ca126a85192ac3c36183b9bfc9ae5116f9 Author: jinmeiliao <jil...@pivotal.io> AuthorDate: Thu Aug 23 15:24:01 2018 -0700 GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338) * rewrite the flaky test with an integration test * pass in the value key when doing the equiJoin * code clean up. --- .../partitioned/PRColocatedEquiJoinDUnitTest.java | 47 --------- .../query/partitioned/PRQueryDUnitHelper.java | 86 +++------------- .../query/internal/IndexManagerJUnitTest.java | 12 +-- .../query/partitioned/PRColocatedEquiJoinTest.java | 110 +++++++++++++++++++++ .../query/internal/IndexTrackingQueryObserver.java | 1 + .../geode/cache/query/internal/QueryUtils.java | 10 +- .../cache/query/internal/index/IndexManager.java | 30 +++--- .../cache/query/internal/index/RangeIndex.java | 7 +- .../geode/test/dunit/rules/ClusterStartupRule.java | 8 ++ .../geode/test/junit/rules/ServerStarterRule.java | 21 ++++ 10 files changed, 175 insertions(+), 157 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java index 29ff500..586cafa 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java @@ -828,53 +828,6 @@ public class PRColocatedEquiJoinDUnitTest extends CacheTestCase { } @Test - public void testRRPRLocalQueryingWithHetroIndexes() throws Exception { - Host host = Host.getHost(0); - VM vm0 = host.getVM(0); - setCacheInVMs(vm0); - - // Creating PR's on the participating VM's - // Creating DataStore node on the VM0. - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRCreate(coloName, redundancy, - NewPortfolio.class)); - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(coloName, "IdIndex1", - "r2.id", "/" + coloName + " r2", null)); - - // Creating Colocated Region DataStore node on the VM0. - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(name, - Portfolio.class)); - - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(name, "IdIndex2", - "r1.ID", "/" + name + " r1, r1.positions.values pos1", null)); - - // Creating local region on vm0 to compare the results of query. - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(localName, - Portfolio.class)); - - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(coloLocalName, - NewPortfolio.class)); - - // Generating portfolio object array to be populated across the PR's & Local Regions - Portfolio[] portfolio = createPortfoliosAndPositions(cntDest); - NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(cntDest); - - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(localName, portfolio, cnt, - cntDest)); - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloLocalName, newPortfolio, - cnt, cntDest)); - - // Putting the data into the PR's created - vm0.invoke( - prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(name, portfolio, cnt, cntDest)); - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloName, newPortfolio, cnt, - cntDest)); - - // querying the VM for data and comparing the result with query result of local region. - vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForRRAndPRQueryAndCompareResults(name, - coloName, localName, coloLocalName)); - } - - @Test public void testPRRRCompactRangeAndNestedRangeIndexQuerying() throws Exception { Host host = Host.getHost(0); VM vm0 = host.getVM(0); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java index 2cd5dc1..edb5cfa 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.CountDownLatch; import util.TestException; @@ -49,7 +48,6 @@ import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.Scope; import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionAdapter; import org.apache.geode.cache.execute.FunctionContext; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; @@ -76,8 +74,6 @@ import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.control.InternalResourceManager; -import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.SerializableRunnableIF; @@ -845,7 +841,6 @@ public class PRQueryDUnitHelper implements Serializable { "p, pos from /REGION_NAME p, p.positions.values pos order by p.ID, pos.secId",}; Object r[][] = new Object[queries.length][2]; - Region local = cache.getRegion(localRegion); Region region = cache.getRegion(regionName); assertNotNull(region); @@ -963,7 +958,6 @@ public class PRQueryDUnitHelper implements Serializable { "p.ID, pos.secId from /REGION_NAME p, p.positions.values pos order by p.ID desc, pos.secId",}; Object r[][] = new Object[1][2]; - Region local = cache.getRegion(localRegion); Region region = cache.getRegion(regionName); assertNotNull(region); @@ -1588,66 +1582,6 @@ public class PRQueryDUnitHelper implements Serializable { } /** - * This function <br> - * 1. calls the cache.close on the VM <br> - * 2. creates the cache again & also the PR <br> - * - * @return cacheSerializable object - * - * NOTE: Closing of the cache must be done from the test case rather than in - * PRQueryDUintHelper - * - */ - - public CacheSerializableRunnable getCacheSerializableRunnableForCacheClose( - final String regionName, final int redundancy, final Class constraint) { - SerializableRunnable PrRegion = new CacheSerializableRunnable("cacheClose") { - @Override - public void run2() throws CacheException { - final String expectedCacheClosedException = CacheClosedException.class.getName(); - final String expectedReplyException = ReplyException.class.getName(); - getCache().getLogger().info("<ExpectedException action=add>" + expectedCacheClosedException - + "</ExpectedException>"); - getCache().getLogger().info( - "<ExpectedException action=add>" + expectedReplyException + "</ExpectedException>"); - Cache cache = getCache(); - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info( - "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: Recreating the cache "); - AttributesFactory attr = new AttributesFactory(); - attr.setValueConstraint(constraint); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); - PartitionAttributes prAttr = paf.setRedundantCopies(redundancy).create(); - attr.setPartitionAttributes(prAttr); - final CountDownLatch cdl = new CountDownLatch(1); - ResourceObserverAdapter observer = new InternalResourceManager.ResourceObserverAdapter() { - @Override - public void recoveryFinished(Region region) { - cdl.countDown(); - } - }; - InternalResourceManager.setResourceObserver(observer); - try { - cache.createRegion(regionName, attr.create()); - // Wait for recovery to finish - cdl.await(); - } catch (InterruptedException e) { - Assert.fail("interupted", e); - } finally { - InternalResourceManager.setResourceObserver(null); - } - org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info( - "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: cache Recreated on VM "); - getCache().getLogger().info( - "<ExpectedException action=remove>" + expectedReplyException + "</ExpectedException>"); - getCache().getLogger().info("<ExpectedException action=remove>" - + expectedCacheClosedException + "</ExpectedException>"); - } - - }; - return (CacheSerializableRunnable) PrRegion; - } - - /** * This function creates a appropriate index on a PR given the name and other parameters. */ public CacheSerializableRunnable getCacheSerializableRunnableForPRIndexCreate( @@ -2046,7 +1980,7 @@ public class PRQueryDUnitHelper implements Serializable { } }; - return (CacheSerializableRunnable) PrRegion; + return PrRegion; } @@ -2160,7 +2094,7 @@ public class PRQueryDUnitHelper implements Serializable { } }; - return (CacheSerializableRunnable) PrRegion; + return PrRegion; } @@ -2272,7 +2206,7 @@ public class PRQueryDUnitHelper implements Serializable { } }; - return (CacheSerializableRunnable) PrRegion; + return PrRegion; } @@ -2288,11 +2222,14 @@ public class PRQueryDUnitHelper implements Serializable { Cache cache = getCache(); // Querying the PR region - String[] queries = new String[] {"r1.ID = r2.id", "r1.ID = r2.id AND r1.ID > 5", + String[] queries = new String[] {"r1.ID = r2.id", + "r1.ID = r2.id AND r1.ID > 5", "r1.ID = r2.id AND r1.status = 'active'", // "r1.ID = r2.id LIMIT 10", - "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id", - "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status", + "r1.ID = r2.id ORDER BY r1.ID", + "r1.ID = r2.id ORDER BY r2.id", + "r1.ID = r2.id ORDER BY r2.status", + "r1.ID = r2.id AND r1.status != r2.status", "r1.ID = r2.id AND r1.status = r2.status", "r1.ID = r2.id AND r1.positions.size = r2.positions.size", "r1.ID = r2.id AND r1.positions.size > r2.positions.size", @@ -2323,7 +2260,6 @@ public class PRQueryDUnitHelper implements Serializable { } QueryService qs = getCache().getQueryService(); - Object[] params; try { for (int j = 0; j < queries.length; j++) { getCache().getLogger().info("About to execute local query: " + queries[j]); @@ -2386,12 +2322,12 @@ public class PRQueryDUnitHelper implements Serializable { } }; - return (CacheSerializableRunnable) PrRegion; + return PrRegion; } // Helper classes and function - class TestQueryFunction extends FunctionAdapter { + public static class TestQueryFunction implements Function { @Override public boolean hasResult() { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java index c82f87e..0d0790c 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java @@ -66,7 +66,7 @@ public class IndexManagerJUnitTest { IndexManager.resetIndexBufferTime(); // fake entry update at LMT of 0 and actual time of 10 // safe query time set in index manager is going to be 20 - assertTrue(IndexManager.setIndexBufferTime(0, 10)); + IndexManager.setIndexBufferTime(0, 10); // fake query start at actual time of 9, 10, 11 and using the fake LMT of 0 assertTrue(IndexManager.needsRecalculation(9, 0)); @@ -84,13 +84,13 @@ public class IndexManagerJUnitTest { // Now let's assume a new update has occurred, this update delta and time combo still is not // larger - assertFalse(IndexManager.setIndexBufferTime(0, 9)); - assertFalse(IndexManager.setIndexBufferTime(1, 10)); + IndexManager.setIndexBufferTime(0, 9); + IndexManager.setIndexBufferTime(1, 10); // Now let's assume a new update has occurred where the time is larger (enough to roll off the // large delta) // but the delta is smaller - assertTrue(IndexManager.setIndexBufferTime(30, 30)); + IndexManager.setIndexBufferTime(30, 30); // Now that we have a small delta, let's see if a query that was "stuck" would reevaluate // appropriately @@ -103,7 +103,7 @@ public class IndexManagerJUnitTest { IndexManager.resetIndexBufferTime(); // fake entry update at LMT of 0 and actual time of 10 // safe query time set in index manager is going to be -10 - assertTrue(IndexManager.setIndexBufferTime(210, 200)); + IndexManager.setIndexBufferTime(210, 200); assertFalse(IndexManager.needsRecalculation(200, 190)); assertFalse(IndexManager.needsRecalculation(200, 200)); @@ -113,7 +113,7 @@ public class IndexManagerJUnitTest { assertTrue(IndexManager.needsRecalculation(200, 221)); // now lets say an entry updates with no delta - assertTrue(IndexManager.setIndexBufferTime(210, 210)); + IndexManager.setIndexBufferTime(210, 210); assertTrue(IndexManager.needsRecalculation(200, 190)); assertTrue(IndexManager.needsRecalculation(200, 200)); diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java new file mode 100644 index 0000000..4fc5f35 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.query.partitioned; + +import static org.apache.geode.cache.query.Utils.createNewPortfoliosAndPositions; +import static org.apache.geode.cache.query.Utils.createPortfoliosAndPositions; + +import java.util.ArrayList; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import parReg.query.unittest.NewPortfolio; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.data.Portfolio; +import org.apache.geode.cache.query.functional.StructSetOrResultsSet; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.junit.categories.OQLQueryTest; +import org.apache.geode.test.junit.rules.ServerStarterRule; + +@Category({OQLQueryTest.class}) +public class PRColocatedEquiJoinTest { + private static final int count = 100; + + @Rule + public ServerStarterRule server = new ServerStarterRule().withAutoStart(); + + @Test + public void prQueryWithHeteroIndex() throws Exception { + InternalCache cache = server.getCache(); + QueryService qs = cache.getQueryService(); + + // create a local and Partition region for 1st select query + Region r1 = server.createRegion(RegionShortcut.LOCAL, "region1", + rf -> rf.setValueConstraint(Portfolio.class)); + qs.createIndex("IdIndex1", "r.ID", "/region1 r, r.positions.values pos"); + Region r2 = server.createRegion(RegionShortcut.PARTITION, "region2", + rf -> rf.setValueConstraint(NewPortfolio.class)); + qs.createIndex("IdIndex2", "r.id", "/region2 r"); + + // create two local regions for 2nd select query to compare the result set + Region r3 = server.createRegion(RegionShortcut.LOCAL, "region3", + rf -> rf.setValueConstraint(Portfolio.class)); + Region r4 = server.createRegion(RegionShortcut.LOCAL, "region4", + rf -> rf.setValueConstraint(NewPortfolio.class)); + + Portfolio[] portfolio = createPortfoliosAndPositions(count); + NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(count); + + for (int i = 0; i < count; i++) { + r1.put(new Integer(i), portfolio[i]); + r2.put(new Integer(i), newPortfolio[i]); + r3.put(new Integer(i), portfolio[i]); + r4.put(new Integer(i), newPortfolio[i]); + } + + ArrayList results[][] = new ArrayList[whereClauses.length][2]; + for (int i = 0; i < whereClauses.length; i++) { + // issue the first select on region 1 and region 2 + SelectResults selectResults = (SelectResults) qs.newQuery("<trace> Select " + + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "") + + "* from /region1 r1, /region2 r2 where " + whereClauses[i]) + .execute(); + results[i][0] = (ArrayList) selectResults.asList(); + + // issue the second select on region 3 and region 4 + SelectResults queryResult = (SelectResults) qs.newQuery("<trace> Select " + + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "") + + "* from /region3 r1, /region4 r2 where " + whereClauses[i]) + .execute(); + results[i][1] = (ArrayList) queryResult.asList(); + } + + // compare the resultsets and expect them to be equal + StructSetOrResultsSet ssORrs = new StructSetOrResultsSet(); + ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(results, whereClauses.length, false, + false, + whereClauses); + } + + private static String[] whereClauses = new String[] {"r2.ID = r1.id", + "r1.ID = r2.id AND r1.ID > 5", + "r1.ID = r2.id AND r1.status = 'active'", + "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id", + "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status", + "r1.ID = r2.id AND r1.status = r2.status", + "r1.ID = r2.id AND r1.positions.size = r2.positions.size", + "r1.ID = r2.id AND r1.positions.size > r2.positions.size", + "r1.ID = r2.id AND r1.positions.size < r2.positions.size", + "r1.ID = r2.id AND r1.positions.size = r2.positions.size AND r2.positions.size > 0", + "r1.ID = r2.id AND (r1.positions.size > r2.positions.size OR r2.positions.size > 0)", + "r1.ID = r2.id AND (r1.positions.size < r2.positions.size OR r1.positions.size > 0)", + }; +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java index 9e88df3..5c715ad 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java @@ -183,6 +183,7 @@ public class IndexTrackingQueryObserver extends QueryObserverAdapter { return results.keySet(); } + // initial result of index in the observer. 0 means it's not updated yet. public void addRegionId(String regionId) { this.results.put(regionId, 0); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java index ec7086d..ef23565 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java @@ -482,7 +482,7 @@ public class QueryUtils { private static void mergeAndExpandCutDownRelationshipIndexResults(Object[][] values, SelectResults result, RuntimeIterator[][] indexFieldToItrsMapping, ListIterator expansionListIterator, List finalItrs, ExecutionContext context, - List[] checkList, CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level) + CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { @@ -506,7 +506,7 @@ public class QueryUtils { } } else { mergeAndExpandCutDownRelationshipIndexResults(values, result, indexFieldToItrsMapping, - expansionListIterator, finalItrs, context, checkList, iterOps, icdeh, level + 1); + expansionListIterator, finalItrs, context, iterOps, icdeh, level + 1); if (icdeh[level + 1].cutDownNeeded) { icdeh[level + 1].checkSet.clear(); } @@ -715,7 +715,6 @@ public class QueryUtils { // be a Compiled Region otherwise it will be a CompiledPath that // we can extract the id from. In the end the result will be the alias which is used as a // prefix - CompiledValue collectionExpression = currentLevel.getCmpIteratorDefn().getCollectionExpr(); String key = null; boolean useDerivedResults = true; if (currentLevel.getCmpIteratorDefn().getCollectionExpr() @@ -1250,7 +1249,7 @@ public class QueryUtils { maxCartesianDepth); } else { mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings, - expansionListIterator, finalList, context, totalCheckList, iterOperands, icdeh, + expansionListIterator, finalList, context, iterOperands, icdeh, 0); } if (icdeh[0].cutDownNeeded) @@ -1472,7 +1471,6 @@ public class QueryUtils { RuntimeIterator[][] mappings = new RuntimeIterator[2][]; mappings[0] = ich1.indexFieldToItrsMapping; mappings[1] = ich2.indexFieldToItrsMapping; - List[] totalCheckList = new List[] {ich1.checkList, ich2.checkList}; Iterator dataItr = data.iterator(); IndexCutDownExpansionHelper[] icdeh = new IndexCutDownExpansionHelper[] {new IndexCutDownExpansionHelper(ich1.checkList, context), @@ -1494,7 +1492,7 @@ public class QueryUtils { // skip the similar row of a set , even when the row in its entirety is unique ( made by // different data in the other set) mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings, - expansionListIterator, totalFinalList, context, totalCheckList, iterOperands, icdeh, + expansionListIterator, totalFinalList, context, iterOperands, icdeh, 0 /* Level */); if (icdeh[0].cutDownNeeded) icdeh[0].checkSet.clear(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java index de9c709..e3c33e8 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java @@ -181,11 +181,15 @@ public class IndexManager { * aggressively. But the large hiccup will eventually be rolled off as time is always increasing * This is a fix for #47475 * - * @param operationTime the last modified time from version tag + * @param lastModifiedTime the last modified time from version tag */ - public static boolean setIndexBufferTime(long operationTime, long currentCacheTime) { - long timeDifference = currentCacheTime - operationTime; - return setNewLargestValue(SAFE_QUERY_TIME, currentCacheTime + timeDifference); + public static void setIndexBufferTime(long lastModifiedTime, long currentCacheTime) { + long oldValue = SAFE_QUERY_TIME.get(); + long newValue = currentCacheTime + currentCacheTime - lastModifiedTime; + if (oldValue < newValue) { + // use compare and set in case the value has been changed since we got the old value + SAFE_QUERY_TIME.compareAndSet(oldValue, newValue); + } } /** @@ -213,21 +217,9 @@ public class IndexManager { * Small amounts of false positives are ok as it will have a slight impact on performance */ public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) { - return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION - && queryStartTime <= SAFE_QUERY_TIME.get() - queryStartTime + lastModifiedTime; - } - - private static boolean setNewLargestValue(AtomicLong value, long newValue) { - boolean done = false; - while (!done) { - long oldValue = value.get(); - if (oldValue < newValue) { - return value.compareAndSet(oldValue, newValue); - } else { - done = true; - } - } - return false; + boolean needsRecalculate = + (queryStartTime <= (lastModifiedTime + (SAFE_QUERY_TIME.get() - queryStartTime))); + return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION && needsRecalculate; } /** Test Hook */ diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java index 28d693d..f7e5b1c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java @@ -584,12 +584,9 @@ public class RangeIndex extends AbstractIndex { Object innerEntry = null; Object outerKey = null; Object innerKey = null; - // boolean incrementOuter = true; boolean incrementInner = true; outer: while (outer.hasNext()) { - // if (incrementOuter) { outerEntry = (Map.Entry) outer.next(); - // } outerKey = outerEntry.getKey(); while (!incrementInner || inner.hasNext()) { if (incrementInner) { @@ -611,7 +608,9 @@ public class RangeIndex extends AbstractIndex { } else { innerValue = ((Map.Entry) innerEntry).getValue(); } - populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context, null); + // GEODE-5440: need to pass in the key value to do EquiJoin + populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context, + outerEntry.getKey()); incrementInner = true; continue outer; } else if (compare < 0) { diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java index 234d90e..de71e7b 100644 --- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java +++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java @@ -42,6 +42,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.security.templates.UserPasswordAuthInit; +import org.apache.geode.test.dunit.DUnitEnv; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.SerializableConsumerIF; @@ -113,6 +114,13 @@ public class ClusterStartupRule extends ExternalResource implements Serializable this.vmCount = vmCount; } + /** + * Returns the port that the standard dunit locator is listening on. + */ + public static int getDUnitLocatorPort() { + return DUnitEnv.get().getLocatorPort(); + } + public static ClientCache getClientCache() { if (clientCacheRule == null) { return null; diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java index e1e7397..1b5d99f 100644 --- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java +++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java @@ -22,8 +22,12 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.function.Consumer; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.internal.DistributionConfig; @@ -167,6 +171,23 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl return this; } + public Region createRegion(RegionShortcut type, String name, + Consumer<RegionFactory> regionFactoryConsumer) { + RegionFactory factory = getCache().createRegionFactory(type); + regionFactoryConsumer.accept(factory); + return factory.create(name); + } + + public Region createPRRegion(String name, Consumer<RegionFactory> regionFactoryConsumer, + Consumer<PartitionAttributesFactory> prAttributesFactory) { + return createRegion(RegionShortcut.PARTITION, name, rf -> { + regionFactoryConsumer.accept(rf); + PartitionAttributesFactory factory = new PartitionAttributesFactory(); + prAttributesFactory.accept(factory); + rf.setPartitionAttributes(factory.create()); + }); + } + public void startServer(Properties properties, int locatorPort) { withProperties(properties).withConnectionToLocator(locatorPort).startServer(); }