This is an automated email from the ASF dual-hosted git repository. ladyvader pushed a commit to branch feature/GEODE-4881 in repository https://gitbox.apache.org/repos/asf/geode.git
commit eb3c725636b256ff8881dc949b2fa586631a349b Author: Lynn Hughes-Godfrey <lhughesgodf...@pivotal.io> AuthorDate: Fri Mar 16 16:47:55 2018 -0700 GEODE-4881: Support lucene reindexing (of existing data) with rebalance * LuceneServiceImpl changes to support AEQ addition to existing region (after index initialized) * Testing with and without redundancy --- .../cache/lucene/internal/LuceneServiceImpl.java | 12 +- ...ncyWithRegionCreatedBeforeReindexDUnitTest.java | 388 +++++++++++++++++++++ 2 files changed, 394 insertions(+), 6 deletions(-) diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 5d0ea48..01bc5c6 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -224,18 +224,15 @@ public class LuceneServiceImpl implements InternalLuceneService { LuceneSerializer serializer) { validateRegionAttributes(region.getAttributes()); - String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); - region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields, analyzer, fieldAnalyzers, serializer)); + String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); LuceneIndexImpl luceneIndex = beforeDataRegionCreated(indexName, regionPath, region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields); afterDataRegionCreated(luceneIndex); - region.addAsyncEventQueueId(aeqId, true); - createLuceneIndexOnDataRegion(region, luceneIndex); } @@ -309,15 +306,18 @@ public class LuceneServiceImpl implements InternalLuceneService { */ public void afterDataRegionCreated(InternalLuceneIndex index) { index.initialize(); - registerIndex(index); + if (this.managementListener != null) { this.managementListener.afterIndexCreated(index); } + String aeqId = LuceneServiceImpl.getUniqueIndexName(index.getName(), index.getRegionPath()); + + ((LuceneIndexImpl) index).getDataRegion().addAsyncEventQueueId(aeqId, true); PartitionedRepositoryManager repositoryManager = (PartitionedRepositoryManager) index.getRepositoryManager(); repositoryManager.allowRepositoryComputation(); - + registerIndex(index); } public LuceneIndexImpl beforeDataRegionCreated(final String indexName, final String regionPath, diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java new file mode 100644 index 0000000..3e0c430 --- /dev/null +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene; + +import static org.apache.geode.cache.lucene.test.IndexRepositorySpy.doAfterN; +import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.INDEX_NAME; +import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.REGION_NAME; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.control.RebalanceFactory; +import org.apache.geode.cache.control.RebalanceOperation; +import org.apache.geode.cache.control.RebalanceResults; +import org.apache.geode.cache.control.ResourceManager; +import org.apache.geode.cache.lucene.internal.LuceneIndexFactoryImpl; +import org.apache.geode.cache.lucene.internal.LuceneServiceImpl; +import org.apache.geode.cache.lucene.test.IndexRegionSpy; +import org.apache.geode.cache.lucene.test.IndexRepositorySpy; +import org.apache.geode.cache.lucene.test.LuceneTestUtilities; +import org.apache.geode.cache.partition.PartitionMemberInfo; +import org.apache.geode.cache.partition.PartitionRebalanceInfo; +import org.apache.geode.cache.partition.PartitionRegionInfo; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook; +import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType; +import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.SerializableRunnableIF; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +@RunWith(JUnitParamsRunner.class) +public class RebalanceWithRedundancyWithRegionCreatedBeforeReindexDUnitTest + extends LuceneQueriesAccessorBase { + + protected VM dataStore3; + protected VM dataStore4; + + public void postSetUp() throws Exception { + super.postSetUp(); + dataStore3 = Host.getHost(0).getVM(2); + dataStore4 = Host.getHost(0).getVM(3); + } + + @Before + public void setNumBuckets() { + NUM_BUCKETS = 113; + } + + @Before + public void setLuceneReindexFlag() { + dataStore1.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = true); + dataStore2.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = true); + dataStore3.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = true); + dataStore4.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = true); + } + + @Override + protected RegionTestableType[] getListOfRegionTestTypes() { + return new RegionTestableType[] {RegionTestableType.PARTITION, + RegionTestableType.PARTITION_REDUNDANT}; + } + + @After + public void clearLuceneReindexFlag() { + dataStore1.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = false); + dataStore2.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = false); + dataStore3.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = false); + dataStore4.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = false); + } + + protected SerializableRunnable createIndex = new SerializableRunnable("createIndex") { + public void run() { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); + ((LuceneIndexFactoryImpl) luceneService.createIndexFactory()).addField("text") + .create(INDEX_NAME, REGION_NAME, LuceneServiceImpl.LUCENE_REINDEX); + } + }; + + protected SerializableRunnable rebalance = new SerializableRunnable("rebalance") { + public void run() { + Cache cache = getCache(); + cache.getRegion(REGION_NAME); + + ResourceManager resMan = cache.getResourceManager(); + RebalanceFactory factory = resMan.createRebalanceFactory(); + System.out.println("Starting rebalance"); + RebalanceResults rebalanceResults = null; + RebalanceOperation rebalanceOp = factory.start(); + try { + rebalanceResults = rebalanceOp.getResults(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> rebalanceOp.isDone()); + while (!rebalanceOp.isDone()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + System.out.println("Rebalance completed: " + + RebalanceResultsToString(rebalanceResults, "Rebalance completed")); + } + }; + + protected SerializableRunnable doConcOps = new SerializableRunnable("doConcOps") { + public void run() { + putEntryInEachBucket(113); + } + }; + + private void createIndex() throws Exception { + // re-index stored data + AsyncInvocation ai1 = dataStore1.invokeAsync(createIndex); + AsyncInvocation ai2 = dataStore2.invokeAsync(createIndex); + AsyncInvocation ai3 = dataStore3.invokeAsync(createIndex); + + ai1.join(); + ai2.join(); + ai3.join(); + + ai1.checkException(); + ai2.checkException(); + ai3.checkException(); + } + + protected void createIndexAndRebalance(RegionTestableType regionTestType, + SerializableRunnableIF createIndex) throws Exception { + createIndexAndRebalance(regionTestType, createIndex, false); + } + + protected void createIndexAndRebalance(RegionTestableType regionTestType, + SerializableRunnableIF createIndex, boolean doOps) throws Exception { + + // give rebalance some work to do by adding another vm + // dataStore4.invoke(() -> (createIndex)); + dataStore4.invoke(() -> initDataStore(regionTestType)); + + AsyncInvocation aiRebalancer = dataStore1.invokeAsync(rebalance); + + if (doOps) { + AsyncInvocation aiConcOps = dataStore1.invokeAsync(doConcOps); + aiConcOps.join(); + aiConcOps.checkException(); + } + + // re-index stored data + AsyncInvocation ai1 = dataStore1.invokeAsync(createIndex); + AsyncInvocation ai2 = dataStore2.invokeAsync(createIndex); + AsyncInvocation ai3 = dataStore3.invokeAsync(createIndex); + AsyncInvocation ai4 = dataStore4.invokeAsync(createIndex); + + aiRebalancer.join(); + aiRebalancer.checkException(); + + ai1.join(); + ai2.join(); + ai3.join(); + ai4.join(); + + ai1.checkException(); + ai2.checkException(); + ai3.checkException(); + ai4.checkException(); + + } + + @Test + @Parameters(method = "getListOfRegionTestTypes") + public void returnCorrectResultsWithConcurrentOpsAndRebalance(RegionTestableType regionTestType) + throws Exception { + + createAndPopulateRegion(regionTestType, NUM_BUCKETS / 2); + + createIndexAndRebalance(regionTestType, createIndex, true); + + executeTextSearch(dataStore3, "world", "text", NUM_BUCKETS); + + } + + private void createAndPopulateRegion(RegionTestableType regionTestType) { + createAndPopulateRegion(regionTestType, NUM_BUCKETS); + } + + private void createAndPopulateRegion(RegionTestableType regionTestType, int numEntries) { + + dataStore1.invoke(() -> initDataStore(regionTestType)); + dataStore2.invoke(() -> initDataStore(regionTestType)); + dataStore3.invoke(() -> initDataStore(regionTestType)); + + putEntryInEachBucket(numEntries); + } + + protected void putEntryInEachBucket(int numBuckets) { + dataStore3.invoke(() -> { + final Cache cache = getCache(); + Region<Object, Object> region = cache.getRegion(REGION_NAME); + IntStream.range(0, numBuckets).forEach(i -> region.put(i, new TestObject("hello world"))); + }); + } + + public static String RebalanceResultsToString(RebalanceResults results, String title) { + if (results == null) { + return "null"; + } + StringBuffer aStr = new StringBuffer(); + aStr.append("Rebalance results (" + title + ") totalTime: " + + valueToString(results.getTotalTime()) + "\n"); + + // bucketCreates + aStr.append( + "totalBucketCreatesCompleted: " + valueToString(results.getTotalBucketCreatesCompleted())); + aStr.append(" totalBucketCreateBytes: " + valueToString(results.getTotalBucketCreateBytes())); + aStr.append( + " totalBucketCreateTime: " + valueToString(results.getTotalBucketCreateTime()) + "\n"); + + // bucketTransfers + aStr.append("totalBucketTransfersCompleted: " + + valueToString(results.getTotalBucketTransfersCompleted())); + aStr.append( + " totalBucketTransferBytes: " + valueToString(results.getTotalBucketTransferBytes())); + aStr.append( + " totalBucketTransferTime: " + valueToString(results.getTotalBucketTransferTime()) + "\n"); + + // primaryTransfers + aStr.append("totalPrimaryTransfersCompleted: " + + valueToString(results.getTotalPrimaryTransfersCompleted())); + aStr.append(" totalPrimaryTransferTime: " + valueToString(results.getTotalPrimaryTransferTime()) + + "\n"); + + // PartitionRebalanceDetails (per region) + Set<PartitionRebalanceInfo> prdSet = results.getPartitionRebalanceDetails(); + for (PartitionRebalanceInfo prd : prdSet) { + aStr.append(partitionRebalanceDetailsToString(prd)); + } + aStr.append("total time (ms): " + valueToString(results.getTotalTime())); + + String returnStr = aStr.toString(); + return returnStr; + } + + private static String partitionRebalanceDetailsToString(PartitionRebalanceInfo details) { + if (details == null) { + return "null\n"; + } + + StringBuffer aStr = new StringBuffer(); + aStr.append("PartitionedRegionDetails for region named " + getRegionName(details) + " time: " + + valueToString(details.getTime()) + "\n"); + + // bucketCreates + aStr.append("bucketCreatesCompleted: " + valueToString(details.getBucketCreatesCompleted())); + aStr.append(" bucketCreateBytes: " + valueToString(details.getBucketCreateBytes())); + aStr.append(" bucketCreateTime: " + valueToString(details.getBucketCreateTime()) + "\n"); + + // bucketTransfers + aStr.append( + "bucketTransfersCompleted: " + valueToString(details.getBucketTransfersCompleted())); + aStr.append(" bucketTransferBytes: " + valueToString(details.getBucketTransferBytes())); + aStr.append(" bucketTransferTime: " + valueToString(details.getBucketTransferTime()) + "\n"); + + // primaryTransfers + aStr.append( + "PrimaryTransfersCompleted: " + valueToString(details.getPrimaryTransfersCompleted())); + aStr.append(" PrimaryTransferTime: " + valueToString(details.getPrimaryTransferTime()) + "\n"); + + // PartitionMemberDetails (before) + aStr.append("PartitionedMemberDetails (before)\n"); + Set<PartitionMemberInfo> pmdSet = details.getPartitionMemberDetailsBefore(); + for (PartitionMemberInfo pmd : pmdSet) { + aStr.append(partitionMemberDetailsToString(pmd)); + } + + // PartitionMemberDetails (after) + aStr.append("PartitionedMemberDetails (after)\n"); + pmdSet = details.getPartitionMemberDetailsAfter(); + for (PartitionMemberInfo pmd : pmdSet) { + aStr.append(partitionMemberDetailsToString(pmd)); + } + + return aStr.toString(); + } + + public static String partitionedRegionDetailsToString(PartitionRegionInfo prd) { + + if (prd == null) { + return "null\n"; + } + + StringBuffer aStr = new StringBuffer(); + + aStr.append("PartitionedRegionDetails for region named " + getRegionName(prd) + "\n"); + aStr.append(" configuredBucketCount: " + valueToString(prd.getConfiguredBucketCount()) + "\n"); + aStr.append(" createdBucketCount: " + valueToString(prd.getCreatedBucketCount()) + "\n"); + aStr.append( + " lowRedundancyBucketCount: " + valueToString(prd.getLowRedundancyBucketCount()) + "\n"); + aStr.append( + " configuredRedundantCopies: " + valueToString(prd.getConfiguredRedundantCopies()) + "\n"); + aStr.append(" actualRedundantCopies: " + valueToString(prd.getActualRedundantCopies()) + "\n"); + + // memberDetails + Set<PartitionMemberInfo> pmd = prd.getPartitionMemberInfo(); + for (PartitionMemberInfo memberDetails : pmd) { + aStr.append(partitionMemberDetailsToString(memberDetails)); + } + + // colocatedWithDetails + String colocatedWith = prd.getColocatedWith(); + aStr.append(" colocatedWith: " + colocatedWith + "\n"); + + String returnStr = aStr.toString(); + return returnStr; + } + + private static String partitionMemberDetailsToString(PartitionMemberInfo pmd) { + StringBuffer aStr = new StringBuffer(); + long localMaxMemory = pmd.getConfiguredMaxMemory(); + long size = pmd.getSize(); + aStr.append(" Member Details for: " + pmd.getDistributedMember() + "\n"); + aStr.append(" configuredMaxMemory: " + valueToString(localMaxMemory)); + double inUse = (double) size / localMaxMemory; + double heapUtilization = inUse * 100; + aStr.append(" size: " + size + " (" + valueToString(heapUtilization) + "%)"); + aStr.append(" bucketCount: " + valueToString(pmd.getBucketCount())); + aStr.append(" primaryCount: " + valueToString(pmd.getPrimaryCount()) + "\n"); + return aStr.toString(); + } + + /** + * Convert the given long to a String; if it is negative then flag it in the string + */ + private static String valueToString(long value) { + String returnStr = "" + value; + return returnStr; + } + + /** + * Convert the given double to a String; if it is negative then flag it in the string + */ + private static String valueToString(double value) { + String returnStr = "" + value; + return returnStr; + } + + public static String getRegionName(PartitionRegionInfo prd) { + return prd.getRegionPath().substring(1); + } + + public static String getRegionName(PartitionRebalanceInfo prd) { + return prd.getRegionPath().substring(1); + } +} -- To stop receiving notification emails like this one, please contact ladyva...@apache.org.