This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-Lucene-Udo in repository https://gitbox.apache.org/repos/asf/geode.git
commit b6a75db290e48eae40a2d6846150d98c42c11fc7 Author: Udo Kohlmeyer <ukohlme...@pivotal.io> AuthorDate: Mon Apr 16 15:46:40 2018 -0700 Initial commit --- .../IncompatibleCacheServiceProfileException.java | 23 +++ .../geode/internal/cache/InternalRegion.java | 3 +- .../apache/geode/internal/cache/LocalRegion.java | 187 ++++++++++++--------- .../geode/internal/cache/PartitionedRegion.java | 10 ++ .../internal/cache/UpdateAttributesProcessor.java | 46 ++--- .../cache/lucene/internal/LuceneServiceImpl.java | 9 +- .../LuceneIndexCreationProfileDUnitTest.java | 109 ++++++++++++ .../LuceneIndexCreationProfileJUnitTest.java | 4 +- .../cache/lucene/test/LuceneTestUtilities.java | 12 +- 9 files changed, 288 insertions(+), 115 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java new file mode 100644 index 0000000..c9a7435 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import org.apache.geode.GemFireCheckedException; + +public class IncompatibleCacheServiceProfileException extends GemFireCheckedException { + public IncompatibleCacheServiceProfileException(String message) { + super(message); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java index 7c5d722..61360d6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java @@ -142,7 +142,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo InternalRegionArguments internalRegionArgs) throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException; - void addCacheServiceProfile(CacheServiceProfile profile); + void addCacheServiceProfile(CacheServiceProfile profile) + throws IncompatibleCacheServiceProfileException; void setEvictionMaximum(int maximum); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 3aab945..bb7a03a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -146,7 +146,6 @@ import org.apache.geode.cache.util.ObjectSizer; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionAdvisor; -import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.DistributionStats; @@ -709,7 +708,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return new Object(); } else { return this.fullPath; // avoids creating another sync object - could be anything unique to - // this region + // this region } } @@ -883,8 +882,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, LocalRegion newRegion = null; try { - if (getDestroyLock) + if (getDestroyLock) { acquireDestroyLock(); + } LocalRegion existing = null; try { if (isDestroyed()) { @@ -1063,7 +1063,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, false, // ifOld null, // expectedOldValue true // requireOldValue TODO txMerge why is oldValue required for - // create? I think so that the EntryExistsException will have it. + // create? I think so that the EntryExistsException will have it. )) { throw new EntryExistsException(event.getKey().toString(), event.getOldValue()); } else { @@ -1635,8 +1635,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, eventReturned = true; return event; } finally { - if (!eventReturned) + if (!eventReturned) { event.release(); + } } } @@ -2518,8 +2519,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } private static void releaseLatch(StoppableCountDownLatch latch) { - if (latch == null) + if (latch == null) { return; + } latch.countDown(); } @@ -3123,10 +3125,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, // check validity of key against keyConstraint if (this.keyConstraint != null) { - if (!this.keyConstraint.isInstance(key)) + if (!this.keyConstraint.isInstance(key)) { throw new ClassCastException( LocalizedStrings.LocalRegion_KEY_0_DOES_NOT_SATISFY_KEYCONSTRAINT_1 .toLocalizedString(key.getClass().getName(), this.keyConstraint.getName())); + } } // We don't need to check that the key is Serializable. Instead, @@ -3145,7 +3148,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, */ private final AtomicInteger tombstoneCount = new AtomicInteger(); - /** a boolean for issuing a client/server configuration mismatch message */ + /** + * a boolean for issuing a client/server configuration mismatch message + */ private boolean concurrencyMessageIssued; /** @@ -3199,7 +3204,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return this.cachePerfStats; } - /** regions track the number of tombstones their map holds for size calculations */ + /** + * regions track the number of tombstones their map holds for size calculations + */ public void incTombstoneCount(int delta) { this.tombstoneCount.addAndGet(delta); this.cachePerfStats.incTombstoneCount(delta); @@ -3331,7 +3338,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** local regions do not perform versioning */ + /** + * local regions do not perform versioning + */ protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) { if (this.getDataPolicy().withPersistence()) { return true; @@ -3381,9 +3390,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * returned. This method is intended for testing.testing purposes only. * * @throws EntryNotFoundException No entry with {@code key} exists - * * @see RegionMap#getEntry - * * @since GemFire 3.2 */ @Override @@ -3452,9 +3459,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * * @throws EntryNotFoundException No entry with {@code key} exists * @throws IllegalStateException If this region does not write to disk - * * @see RegionEntry#getValueOnDisk - * * @since GemFire 3.2 */ @Override @@ -3486,9 +3491,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * * @throws EntryNotFoundException No entry with {@code key} exists * @throws IllegalStateException If this region does not write to disk - * * @see RegionEntry#getValueOnDisk - * * @since GemFire 5.1 */ public Object getValueOnDiskOrBuffer(Object key) throws EntryNotFoundException { @@ -4396,12 +4399,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, clearViaFilterClass((String) key); break; case InterestType.KEY: - if (key instanceof String && key.equals("ALL_KEYS")) + if (key instanceof String && key.equals("ALL_KEYS")) { clearViaRegEx(".*"); - else if (key instanceof List) + } else if (key instanceof List) { clearViaList((List) key); - else + } else { localDestroyNoCallbacks(key); + } break; case InterestType.OQL_QUERY: clearViaQuery((String) key); @@ -4435,7 +4439,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** must be holding destroy lock */ + /** + * must be holding destroy lock + */ void reinitializeFromImageTarget(InternalDistributedMember imageTarget) throws TimeoutException, IOException, ClassNotFoundException { Assert.assertTrue(imageTarget != null); @@ -4450,7 +4456,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return this.reinitialized_new; } - /** must be holding destroy lock */ + /** + * must be holding destroy lock + */ void reinitialize_destroy(RegionEventImpl event) throws CacheWriterException, TimeoutException { final boolean cacheWrite = !event.originRemote; // register this region as reinitializing @@ -4458,7 +4466,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, basicDestroyRegion(event, cacheWrite, false/* lock */, true); } - /** must be holding destroy lock */ + /** + * must be holding destroy lock + */ private void recreate(InputStream inputStream, InternalDistributedMember imageTarget) throws TimeoutException, IOException, ClassNotFoundException { String thePath = getFullPath(); @@ -4682,7 +4692,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return latch.getCount() == 0; } - /** wait on the initialization Latch based on thread requirements */ + /** + * wait on the initialization Latch based on thread requirements + */ @Override public void waitOnInitialization() { if (this.initialized) { @@ -4736,7 +4748,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, waitOnInitialization(this.initializationLatchAfterGetInitialImage); } - /** return null if not found */ + /** + * return null if not found + */ @Override public RegionEntry basicGetEntry(Object key) { // ok to ignore tx state; all callers are non-transactional @@ -4873,7 +4887,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * than any other surviving members. So they shouldn't have any entries in their cache that match * entries that we failed to receive through the GII but are reflected in our current RVV. So it * should be safe to start with the current RVV. - * */ void repairRVV() { RegionVersionVector rvv = this.getVersionVector(); @@ -5031,7 +5044,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * * @param event the event object for this operation, with the exception that the oldValue * parameter is not yet filled in. The oldValue will be filled in by this operation. - * * @param ifNew true if this operation must not overwrite an existing key * @param ifOld true if this operation must not create a new key * @param expectedOldValue only succeed if old value is equal to this value. If null, then doesn't @@ -5570,7 +5582,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * * @param event the event object for this operation, with the exception that the oldValue * parameter is not yet filled in. The oldValue will be filled in by this operation. - * * @param ifNew true if this operation must not overwrite an existing key * @param ifOld true if this operation must not create a new entry * @param lastModified the lastModified time to set with the value; if 0L, then the lastModified @@ -6019,7 +6030,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * * @param task - a Runnable to wrap the processing of the bulk op * @param eventId - the base event ID of the bulk op - * * @since GemFire 5.7 */ public void syncBulkOp(Runnable task, EventID eventId) { @@ -6298,9 +6308,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, /** * Called after this region has been completely created * - * @since GemFire 5.0 - * * @see DistributedRegion#postDestroyRegion(boolean, RegionEventImpl) + * @since GemFire 5.0 */ @Override public void postCreateRegion() { @@ -6980,7 +6989,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** @return true if initialization is complete */ + /** + * @return true if initialization is complete + */ @Override public boolean isInitialized() { if (this.initialized) { @@ -7097,7 +7108,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * to map entry, and key must be in map * * @param lastModified time, may be 0 in which case uses now instead - * * @return the actual lastModifiedTime used. */ @Override @@ -7202,7 +7212,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** The listener is not closed until after the afterRegionDestroy event */ + /** + * The listener is not closed until after the afterRegionDestroy event + */ protected void closeCallbacksExceptListener() { closeCacheCallback(getCacheLoader()); closeCacheCallback(getCacheWriter()); @@ -7212,7 +7224,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** This is only done when the cache is closed. */ + /** + * This is only done when the cache is closed. + */ private void closeAllCallbacks() { closeCallbacksExceptListener(); CacheListener[] listeners = fetchCacheListenersField(); @@ -7578,7 +7592,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * the region have been set. * * @return {@code null} is a disk region is not desired - * * @since GemFire 3.2 */ protected DiskRegion createDiskRegion(InternalRegionArguments internalRegionArgs) @@ -7863,8 +7876,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, if (other.region != null) { return false; } - } else if (!this.region.equals(other.region)) + } else if (!this.region.equals(other.region)) { return false; + } return true; } @@ -8156,10 +8170,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private void cancelAllEntryExpiryTasks() { // This method gets called during LocalRegion construction // in which case the final entryExpiryTasks field can still be null - if (this.entryExpiryTasks == null) + if (this.entryExpiryTasks == null) { return; - if (this.entryExpiryTasks.isEmpty()) + } + if (this.entryExpiryTasks.isEmpty()) { return; + } boolean doPurge = false; for (EntryExpiryTask task : this.entryExpiryTasks.values()) { // no need to call incCancels since we will call forcePurge @@ -8227,7 +8243,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, getImageState().readUnlockRI(); } - /** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */ + /** + * doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor + */ LocalRegion basicGetParentRegion() { return this.parentRegion; } @@ -8403,7 +8421,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, this.entries.decTxRefCount(regionEntry); } - /** Does not throw RegionDestroyedException even if destroyed */ + /** + * Does not throw RegionDestroyedException even if destroyed + */ List debugGetSubregionNames() { List names = new ArrayList(); names.addAll(this.subregions.keySet()); @@ -8508,7 +8528,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** Set view of subregions */ + /** + * Set view of subregions + */ private class SubregionsSet extends AbstractSet { final boolean recursive; @@ -8663,7 +8685,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, this.key = regionEntry.getKey(); } - /** Internal method for getting the underlying RegionEntry */ + /** + * Internal method for getting the underlying RegionEntry + */ public RegionEntry getRegionEntry() { RegionEntry regionEntry = LocalRegion.this.getRegionMap().getEntry(this.key); if (regionEntry == null) { @@ -10310,7 +10334,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, FilterProfile filterProfile; /** - * * @return int array containing the IDs of the oplogs which will potentially get rolled else null * if no oplogs were available at the time of signal or region is not having disk * persistence. Pls note that the actual number of oplogs rolled may be more than what is @@ -10422,8 +10445,17 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return this.cacheServiceProfiles.getSnapshot(); } - public void addCacheServiceProfile(CacheServiceProfile profile) { - this.cacheServiceProfiles.put(profile.getId(), profile); + @Override + public void addCacheServiceProfile(CacheServiceProfile profile) + throws IncompatibleCacheServiceProfileException { + synchronized (this.cacheServiceProfiles) { + this.cacheServiceProfiles.putIfAbsent(profile.getId(), profile); + this.validateProfiles(); + } + } + + protected boolean validateProfiles() throws IncompatibleCacheServiceProfileException { + return true; } @Override @@ -10433,42 +10465,38 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, searcher); } - /** visitor over the CacheProfiles to check if the region has a CacheLoader */ + /** + * visitor over the CacheProfiles to check if the region has a CacheLoader + */ private static final DistributionAdvisor.ProfileVisitor<Void> netLoaderVisitor = - new DistributionAdvisor.ProfileVisitor<Void>() { - @Override - public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex, - int numProfiles, Void aggregate) { - assert profile instanceof CacheProfile; - final CacheProfile prof = (CacheProfile) profile; - - // if region in cache is not yet initialized, exclude - if (prof.regionInitialized) { // fix for bug 41102 - // cut the visit short if we find a CacheLoader - return !prof.hasCacheLoader; - } - // continue the visit - return true; + (advisor, profile, profileIndex, numProfiles, aggregate) -> { + assert profile instanceof CacheProfile; + final CacheProfile prof = (CacheProfile) profile; + + // if region in cache is not yet initialized, exclude + if (prof.regionInitialized) { // fix for bug 41102 + // cut the visit short if we find a CacheLoader + return !prof.hasCacheLoader; } + // continue the visit + return true; }; - /** visitor over the CacheProfiles to check if the region has a CacheWriter */ + /** + * visitor over the CacheProfiles to check if the region has a CacheWriter + */ private static final DistributionAdvisor.ProfileVisitor<Void> netWriterVisitor = - new DistributionAdvisor.ProfileVisitor<Void>() { - @Override - public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex, - int numProfiles, Void aggregate) { - assert profile instanceof CacheProfile; - final CacheProfile prof = (CacheProfile) profile; - - // if region in cache is in recovery - if (!prof.inRecovery) { - // cut the visit short if we find a CacheWriter - return !prof.hasCacheWriter; - } - // continue the visit - return true; + (advisor, profile, profileIndex, numProfiles, aggregate) -> { + assert profile instanceof CacheProfile; + final CacheProfile prof = (CacheProfile) profile; + + // if region in cache is in recovery + if (!prof.inRecovery) { + // cut the visit short if we find a CacheWriter + return !prof.hasCacheWriter; } + // continue the visit + return true; }; /** @@ -11458,7 +11486,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - /** test hook - dump the backing map for this region */ + /** + * test hook - dump the backing map for this region + */ public void dumpBackingMap() { synchronized (this.entries) { if (this.entries instanceof AbstractRegionMap) { @@ -11514,7 +11544,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @return previous value associated with specified key, or <tt>null</tt> if there was no mapping * for key. A <tt>null</tt> return can also indicate that the entry in the region was * previously in an invalidated state. - * * @throws ClassCastException if key does not satisfy the keyConstraint * @throws IllegalArgumentException if the key or value is not serializable and this is a * distributed region @@ -11990,14 +12019,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * @param indexName the name of the index * @param indexedExpression the index expression * @param fromClause the from clause. - * * @return The index map. - * * @throws IllegalStateException if this region is not using soplog persistence - * * @throws IllegalStateException if this index was previously persisted with a different * expression or from clause. - * */ public IndexMap getIndexMap(String indexName, String indexedExpression, String fromClause) { return new IndexMapImpl(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index a7d0800..58d2284 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -476,6 +476,16 @@ public class PartitionedRegion extends LocalRegion colocationListeners.remove(colocationListener); } + @Override + protected boolean validateProfiles() throws IncompatibleCacheServiceProfileException { + try { + new CreateRegionProcessor((PartitionedRegion) this).initializeRegion(); + return true; + } catch (Exception e) { + throw new IncompatibleCacheServiceProfileException(e.getMessage()); + } + // new UpdateAttributesProcessor((PartitionedRegion) this).validateProfiles() + } static PRIdMap getPrIdToPR() { return prIdToPR; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java index b0effb2..60e2527 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java @@ -20,6 +20,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; @@ -48,7 +49,6 @@ import org.apache.geode.internal.logging.LogService; * This class is a bit misnamed. It really has more with pushing a DistributionAdvisee's profile out * to others and, optionally if <code>profileExchange</code>, fetching the profile of anyone who * excepts the pushed profile. - * */ public class UpdateAttributesProcessor { private static final Logger logger = LogService.getLogger(); @@ -64,7 +64,9 @@ public class UpdateAttributesProcessor { private boolean removeProfile = false; private ReplyProcessor21 processor; - /** Creates a new instance of UpdateAttributesProcessor */ + /** + * Creates a new instance of UpdateAttributesProcessor + */ public UpdateAttributesProcessor(DistributionAdvisee da) { this(da, false); } @@ -115,6 +117,7 @@ public class UpdateAttributesProcessor { } } + public void sendProfileUpdate(boolean exchangeProfiles) { DistributionManager mgr = this.advisee.getDistributionManager(); DistributionAdvisor advisor = this.advisee.getDistributionAdvisor(); @@ -151,32 +154,31 @@ public class UpdateAttributesProcessor { return; } - ReplyProcessor21 processor = null; - // Scope scope = this.region.scope; - // always require an ack to prevent misordering of messages InternalDistributedSystem system = this.advisee.getSystem(); - processor = new UpdateAttributesReplyProcessor(system, recipients); - UpdateAttributesMessage message = getUpdateAttributesMessage(processor, recipients); + UpdateAttributesReplyProcessor replyProcessor = + new UpdateAttributesReplyProcessor(system, recipients); + UpdateAttributesMessage message = getUpdateAttributesMessage(replyProcessor); mgr.putOutgoing(message); - this.processor = processor; + this.processor = replyProcessor; } - - UpdateAttributesMessage getUpdateAttributesMessage(ReplyProcessor21 processor, Set recipients) { + UpdateAttributesMessage getUpdateAttributesMessage(UpdateAttributesReplyProcessor processor) { UpdateAttributesMessage msg = new UpdateAttributesMessage(); msg.adviseePath = this.advisee.getFullPath(); - msg.setRecipients(recipients); - if (processor != null) { - msg.processorId = processor.getProcessorId(); - } + msg.setRecipients(processor.getRecipients()); + msg.processorId = processor.getProcessorId(); msg.profile = this.advisee.getProfile(); msg.exchangeProfiles = this.profileExchange; msg.removeProfile = this.removeProfile; return msg; } + public void validateProfiles() { + + } + class UpdateAttributesReplyProcessor extends ReplyProcessor21 { UpdateAttributesReplyProcessor(InternalDistributedSystem system, Set members) { @@ -236,12 +238,9 @@ public class UpdateAttributesProcessor { if (msg instanceof ProfilesReplyMessage) { ProfilesReplyMessage reply = (ProfilesReplyMessage) msg; if (reply.profiles != null) { - for (int i = 0; i < reply.profiles.length; i++) { - // @todo Add putProfiles to DistributionAdvisor to do this - // with one call atomically? - UpdateAttributesProcessor.this.advisee.getDistributionAdvisor() - .putProfile(reply.profiles[i]); - } + Arrays.stream(reply.profiles) + .forEach((profile) -> UpdateAttributesProcessor.this.advisee + .getDistributionAdvisor().putProfile(profile)); } } else if (msg instanceof ProfileReplyMessage) { ProfileReplyMessage reply = (ProfileReplyMessage) msg; @@ -254,6 +253,10 @@ public class UpdateAttributesProcessor { super.process(msg); } } + + public Collection getRecipients() { + return Arrays.asList(members); + } } @@ -439,6 +442,7 @@ public class UpdateAttributesProcessor { } } + /** * Used to return multiple profiles * @@ -465,14 +469,12 @@ public class UpdateAttributesProcessor { } - @Override public int getDSFID() { return PROFILES_REPLY_MESSAGE; } - @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { super.fromData(in); diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 5756d71..e7cabce 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -69,6 +69,7 @@ import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.CacheService; +import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PrimaryBucketException; @@ -221,8 +222,12 @@ public class LuceneServiceImpl implements InternalLuceneService { LuceneSerializer serializer) { validateRegionAttributes(region.getAttributes()); - region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields, - analyzer, fieldAnalyzers, serializer)); + try { + region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields, + analyzer, fieldAnalyzers, serializer)); + } catch (IncompatibleCacheServiceProfileException e) { + e.printStackTrace(); + } String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); region.updatePRConfigWithNewGatewaySender(aeqId); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java new file mode 100644 index 0000000..17a3d6e --- /dev/null +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.awaitility.Awaitility; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedTestRule; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.categories.LuceneTest; + +@Category({DistributedTest.class, LuceneTest.class}) +public class LuceneIndexCreationProfileDUnitTest implements Serializable { + + private static final String INDEX_NAME = "index"; + private static final String REGION_NAME = "region"; + + @ClassRule + public static DistributedTestRule distributedTestRule = new DistributedTestRule(2); + + @Rule + public CacheRule cacheRule = CacheRule.builder() + .addSystemProperty(DistributionConfig.GEMFIRE_PREFIX + "luceneReindex", "true") + .createCacheInAll().disconnectAfter().build(); + + @Test + public void testConcurrentIndexCreationWithDifferentProfiles() { + VM vm0 = VM.getVM(0); + VM vm1 = VM.getVM(1); + + vm0.invoke(this::setupCacheAndRegion); + vm1.invoke(this::setupCacheAndRegion); + + vm0.invoke(() -> { + Region<Object, Object> region = cacheRule.getCache().getRegion(REGION_NAME); + for (int i = 0; i < 113; i++) { + region.put(i, i); + } + }); + + AsyncInvocation<Boolean> asyncInvocation0 = vm0.invokeAsync(() -> { + PartitionedRegion region1 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + try { + region1.addCacheServiceProfile(getOneFieldLuceneIndexCreationProfile()); + return false; + } catch (IncompatibleCacheServiceProfileException e) { + e.printStackTrace(); + return true; + } + }); + + AsyncInvocation<Boolean> asyncInvocation1 = vm1.invokeAsync(() -> { + PartitionedRegion region2 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + try { + region2.addCacheServiceProfile(getTwoFieldLuceneIndexCreationProfile()); + return false; + } catch (IncompatibleCacheServiceProfileException e) { + e.printStackTrace(); + return true; + } + }); + + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .until(() -> asyncInvocation0.get() && asyncInvocation1.get()); + } + + private void setupCacheAndRegion() { + InternalCache cache = cacheRule.getCache(); + cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME); + } + + private LuceneIndexCreationProfile getOneFieldLuceneIndexCreationProfile() { + return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME, new String[] {"field1"}, + new StandardAnalyzer(), null, null); + } + + private LuceneIndexCreationProfile getTwoFieldLuceneIndexCreationProfile() { + return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME, + new String[] {"field1", "field2"}, new StandardAnalyzer(), null, null); + } +} diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java index 8a88e72..7e76b93 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java @@ -61,11 +61,11 @@ public class LuceneIndexCreationProfileJUnitTest { } private Object[] getSerializationProfiles() { - return $(new Object[] {getOneFieldLuceneIndexCreationProfile()}, + return new Object[] {new Object[] {getOneFieldLuceneIndexCreationProfile()}, new Object[] {getTwoFieldLuceneIndexCreationProfile()}, new Object[] {getTwoAnalyzersLuceneIndexCreationProfile()}, new Object[] {getDummySerializerCreationProfile()}, - new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()}); + new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()}}; } @Test diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java index e3c2001..091d4fe 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java @@ -63,7 +63,7 @@ public class LuceneTestUtilities { public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_FIELDS = "Cannot create Lucene index index on region /region with fields [field1, field2] because another member defines the same index with fields [field1]."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_FIELDS_2 = - "Cannot create Lucene index index on region /region with fields [field1] because another member defines the same index with fields [field1, field2]."; + "Cannot create Lucene index on region /region with fields [field1] because another member defines the same index with fields [field1, field2]."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_ANALYZERS = "Cannot create Lucene index index on region /region with analyzer StandardAnalyzer on field field2 because another member defines the same index with analyzer KeywordAnalyzer on that field."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_ANALYZERS_1 = @@ -73,20 +73,18 @@ public class LuceneTestUtilities { public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_ANALYZERS_3 = "Cannot create Lucene index index on region /region with analyzer KeywordAnalyzer on field field2 because another member defines the same index with analyzer StandardAnalyzer on that field."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_NAMES = - "Cannot create Lucene index index2 on region /region because it is not defined in another member."; + "Cannot create Lucene index2 on region /region because it is not defined in another member."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_1 = "Must create Lucene index index on region /region because it is defined in another member."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_2 = - "Cannot create Lucene index index2 on region /region because it is not defined in another member."; + "Cannot create Lucene index2 on region /region because it is not defined in another member."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_3 = "Cannot create Lucene index index on region /region because it is not defined in another member."; public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_SERIALIZER = "Cannot create Lucene index index on region /region with serializer DummyLuceneSerializer because another member defines the same index with different serializer HeterogeneousLuceneSerializer."; - public static String Quarter1 = "Q1"; - public static String Quarter2 = "Q2"; - public static String Quarter3 = "Q3"; - public static String Quarter4 = "Q4"; + private static String Quarter1 = "Q1"; + private static String Quarter2 = "Q2"; public static void verifyResultOrder(Collection<EntryScore<String>> list, EntryScore<String>... expectedEntries) { -- To stop receiving notification emails like this one, please contact u...@apache.org.