http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index bbff29c..b3c23b1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -14,19 +14,62 @@ */ package org.apache.geode.internal.cache; -import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE; -import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetAddress; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.StatisticsFactory; import org.apache.geode.SystemFailure; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.DiskStoreFactory; @@ -72,55 +115,11 @@ import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.PdxField; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.PeerTypeRegistration; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetAddress; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Represents a (disk-based) persistent store for region data. Used for both persistent recoverable * regions and overflow-only regions. - * - * + * * @since GemFire 3.2 */ @SuppressWarnings("synthetic-access") @@ -128,6 +127,7 @@ public class DiskStoreImpl implements DiskStore { private static final Logger logger = LogService.getLogger(); private static final String BACKUP_DIR_PREFIX = "dir"; + public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG"); public static final int MAX_OPEN_INACTIVE_OPLOGS = @@ -166,6 +166,7 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_VALUE_PROPERTY_NAME = DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValues"; + public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME = DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValuesSync"; @@ -177,9 +178,12 @@ public class DiskStoreImpl implements DiskStore { DistributionConfig.GEMFIRE_PREFIX + "disk.recoverLruValues"; boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true); + boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false); + boolean FORCE_KRF_RECOVERY = getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disk.FORCE_KRF_RECOVERY", false); + final boolean RECOVER_LRU_VALUES = getBoolean(DiskStoreImpl.RECOVER_LRU_VALUES_PROPERTY_NAME, false); @@ -188,7 +192,9 @@ public class DiskStoreImpl implements DiskStore { } public static final long MIN_RESERVED_DRID = 1; + public static final long MAX_RESERVED_DRID = 8; + static final long MIN_DRID = MAX_RESERVED_DRID + 1; /** @@ -205,9 +211,7 @@ public class DiskStoreImpl implements DiskStore { private final int MAX_OPLOGS_PER_COMPACTION = Integer.getInteger( DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION", Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_ROLL", 1).intValue()); - /** - * - */ + public static final int MAX_CONCURRENT_COMPACTIONS = Integer.getInteger( DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_COMPACTIONS", Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_ROLLS", 1).intValue()); @@ -219,6 +223,7 @@ public class DiskStoreImpl implements DiskStore { */ public static final int MAX_PENDING_TASKS = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "disk.MAX_PENDING_TASKS", 6); + /** * This system property indicates that IF should also be preallocated. This property will be used * in conjunction with the PREALLOCATE_OPLOGS property. If PREALLOCATE_OPLOGS is ON the below will @@ -227,6 +232,7 @@ public class DiskStoreImpl implements DiskStore { static final boolean PREALLOCATE_IF = !System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "preAllocateIF", "true") .equalsIgnoreCase("false"); + /** * This system property indicates that Oplogs should be preallocated till the maxOplogSize as * specified for the disk store. @@ -252,19 +258,14 @@ public class DiskStoreImpl implements DiskStore { public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS; public static volatile HashSet<String> TEST_NO_FALLOC_DIRS; - // /** delay for slowing down recovery, for testing purposes only */ - // public static volatile int recoverDelay = 0; - - // //////////////////// Instance Fields /////////////////////// - - private final GemFireCacheImpl cache; + private final InternalCache cache; /** The stats for this store */ private final DiskStoreStats stats; /** - * Asif:Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of - * the threads acquiring read lock, etc is not a good idea to solve the issue + * Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of the + * threads acquiring read lock, etc is not a good idea to solve the issue */ private final AtomicInteger entryOpsCount = new AtomicInteger(); /** @@ -291,10 +292,11 @@ public class DiskStoreImpl implements DiskStore { * is forced. If this value is 0 then no limit. */ private final int maxAsyncItems; + private final AtomicInteger forceFlushCount; + private final Object asyncMonitor; - // complex vars /** Compactor task which does the compaction. Null if compaction not possible. */ private final OplogCompactor oplogCompactor; @@ -303,7 +305,9 @@ public class DiskStoreImpl implements DiskStore { private volatile DiskStoreBackup diskStoreBackup = null; private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock(); + private final WriteLock compactorWriteLock = compactorLock.writeLock(); + private final ReadLock compactorReadLock = compactorLock.readLock(); /** @@ -316,37 +320,21 @@ public class DiskStoreImpl implements DiskStore { new AtomicReference<DiskAccessException>(); PersistentOplogSet persistentOplogs = new PersistentOplogSet(this); - OverflowOplogSet overflowOplogs = new OverflowOplogSet(this); - - // private boolean isThreadWaitingForSpace = false; - - /** - * Get the next available dir - */ - - // /** - // * Max timed wait for disk space to become available for an entry operation - // , - // * in milliseconds. This will be the maximum time for which a - // * create/modify/remove operation will wait so as to allow switch over & get - // a - // * new Oplog for writing. If no space is available in that time, - // * DiskAccessException will be thrown. The default wait will be for 120 - // * seconds - // */ - // private static final long MAX_WAIT_FOR_SPACE = Integer.getInteger( - // "MAX_WAIT_FOR_SPACE", 20).intValue() * 1000; + OverflowOplogSet overflowOplogs = new OverflowOplogSet(this); private final AtomicLong regionIdCtr = new AtomicLong(MIN_DRID); + /** * Only contains backup DiskRegions. The Value could be a RecoveredDiskRegion or a DiskRegion */ private final ConcurrentMap<Long, DiskRegion> drMap = new ConcurrentHashMap<Long, DiskRegion>(); + /** * A set of overflow only regions that are using this disk store. */ private final Set<DiskRegion> overflowMap = new ConcurrentHashSet<DiskRegion>(); + /** * Contains all of the disk recovery stores for which we are recovering values asnynchronously. */ @@ -369,9 +357,8 @@ public class DiskStoreImpl implements DiskStore { private final ThreadPoolExecutor diskStoreTaskPool; private final ThreadPoolExecutor delayedWritePool; - private volatile Future lastDelayedWrite; - // ///////////////////// Constructors ///////////////////////// + private volatile Future lastDelayedWrite; private static int calcCompactionThreshold(int ct) { if (ct == DiskStoreFactory.DEFAULT_COMPACTION_THRESHOLD) { @@ -387,19 +374,19 @@ public class DiskStoreImpl implements DiskStore { } /** - * Creates a new <code>DiskRegion</code> that access disk on behalf of the given region. + * Creates a new {@code DiskRegion} that access disk on behalf of the given region. */ - DiskStoreImpl(Cache cache, DiskStoreAttributes props) { + DiskStoreImpl(InternalCache cache, DiskStoreAttributes props) { this(cache, props, false, null); } - DiskStoreImpl(Cache cache, DiskStoreAttributes props, boolean ownedByRegion, + DiskStoreImpl(InternalCache cache, DiskStoreAttributes props, boolean ownedByRegion, InternalRegionArguments internalRegionArgs) { this(cache, props.getName(), props, ownedByRegion, internalRegionArgs, false, false/* upgradeVersionOnly */, false, false, true, false/* offlineModify */); } - DiskStoreImpl(Cache cache, String name, DiskStoreAttributes props, boolean ownedByRegion, + DiskStoreImpl(InternalCache cache, String name, DiskStoreAttributes props, boolean ownedByRegion, InternalRegionArguments internalRegionArgs, boolean offline, boolean upgradeVersionOnly, boolean offlineValidating, boolean offlineCompacting, boolean needsOplogs, boolean offlineModify) { @@ -427,7 +414,7 @@ public class DiskStoreImpl implements DiskStore { this.warningPercent = props.getDiskUsageWarningPercentage(); this.criticalPercent = props.getDiskUsageCriticalPercentage(); - this.cache = (GemFireCacheImpl) cache; + this.cache = cache; StatisticsFactory factory = cache.getDistributedSystem(); this.stats = new DiskStoreStats(factory, getName()); @@ -474,7 +461,7 @@ public class DiskStoreImpl implements DiskStore { this.maxDirSize = tempMaxDirSize * 1024 * 1024; this.infoFileDirIndex = 0; // Now that we no longer have db files, use all directories for oplogs - /** + /* * The infoFileDir contains the lock file and the init file. It will be directories[0] on a * brand new disk store. On an existing disk store it will be the directory the init file is * found in. @@ -495,7 +482,7 @@ public class DiskStoreImpl implements DiskStore { int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS; final ThreadGroup compactThreadGroup = - LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", this.logger); + LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger); final ThreadFactory compactThreadFactory = GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor"); this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 10, TimeUnit.SECONDS, @@ -504,7 +491,7 @@ public class DiskStoreImpl implements DiskStore { final ThreadGroup deleteThreadGroup = - LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", this.logger); + LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", logger); final ThreadFactory deleteThreadFactory = GemfireCacheHelper.CreateThreadFactory(deleteThreadGroup, "Oplog Delete Task"); @@ -583,7 +570,7 @@ public class DiskStoreImpl implements DiskStore { } /** - * Returns the <code>DiskStoreStats</code> for this store + * Returns the {@code DiskStoreStats} for this store */ public DiskStoreStats getStats() { return this.stats; @@ -697,7 +684,7 @@ public class DiskStoreImpl implements DiskStore { * @param entry The entry which is going to be written to disk * @throws RegionClearedException If a clear operation completed before the put operation * completed successfully, resulting in the put operation to abort. - * @throws IllegalArgumentException If <code>id</code> is less than zero + * @throws IllegalArgumentException If {@code id} is less than zero */ final void put(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) throws RegionClearedException { @@ -886,7 +873,6 @@ public class DiskStoreImpl implements DiskStore { * Given a BytesAndBits object convert it to the relevant Object (deserialize if necessary) and * return the object * - * @param bb * @return the converted object */ static Object convertBytesAndBitsIntoObject(BytesAndBits bb) { @@ -909,7 +895,6 @@ public class DiskStoreImpl implements DiskStore { /** * Given a BytesAndBits object get the serialized blob * - * @param bb * @return the converted object */ static Object convertBytesAndBitsToSerializedForm(BytesAndBits bb) { @@ -1029,7 +1014,7 @@ public class DiskStoreImpl implements DiskStore { * HTree with the oplog being destroyed * * @return null if entry has nothing stored on disk (id == INVALID_ID) - * @throws IllegalArgumentException If <code>id</code> is less than zero, no action is taken. + * @throws IllegalArgumentException If {@code id} is less than zero, no action is taken. */ public final Object getNoBuffer(DiskRegion dr, DiskId id) { BytesAndBits bb = null; @@ -1067,8 +1052,8 @@ public class DiskStoreImpl implements DiskStore { * * @throws RegionClearedException If a clear operation completed before the put operation * completed successfully, resulting in the put operation to abort. - * @throws IllegalArgumentException If <code>id</code> is {@linkplain #INVALID_ID invalid}or is - * less than zero, no action is taken. + * @throws IllegalArgumentException If {@code id} is {@linkplain #INVALID_ID invalid}or is less + * than zero, no action is taken. */ final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) throws RegionClearedException { @@ -1191,7 +1176,7 @@ public class DiskStoreImpl implements DiskStore { if (currentOpsInProgress == 0) { synchronized (this.closeRegionGuard) { if (dr.isRegionClosed() && entryOpsCount.get() == 0) { - this.closeRegionGuard.notify(); + this.closeRegionGuard.notifyAll(); } } } @@ -1237,7 +1222,6 @@ public class DiskStoreImpl implements DiskStore { /** * Get serialized form of data off the disk * - * @param id * @since GemFire 5.7 */ public Object getSerializedData(DiskRegion dr, DiskId id) { @@ -1269,7 +1253,7 @@ public class DiskStoreImpl implements DiskStore { DiskEntry entry = ade.de; DiskEntry.Helper.handleFullAsyncQueue(entry, region, tag); } - } catch (RegionDestroyedException ex) { + } catch (RegionDestroyedException ignore) { // Normally we flush before closing or destroying a region // but in some cases it is closed w/o flushing. // So just ignore it; see bug 41305. @@ -1397,8 +1381,7 @@ public class DiskStoreImpl implements DiskStore { private int fillDrainList() { synchronized (this.drainSync) { this.drainList = new ArrayList(asyncQueue.size()); - int drainCount = asyncQueue.drainTo(this.drainList); - return drainCount; + return asyncQueue.drainTo(this.drainList); } } @@ -1410,8 +1393,6 @@ public class DiskStoreImpl implements DiskStore { * To fix bug 41770 clear the list in a way that will not break a concurrent iterator that is not * synced on drainSync. Only clear from it entries on the given region. Currently we do this by * clearing the isPendingAsync bit on each entry in this list. - * - * @param rvv */ void clearDrainList(LocalRegion r, RegionVersionVector rvv) { synchronized (this.drainSync) { @@ -1516,7 +1497,7 @@ public class DiskStoreImpl implements DiskStore { try { this.flusherThread.join(waitMs); return true; - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } return false; @@ -1532,7 +1513,7 @@ public class DiskStoreImpl implements DiskStore { } } - public GemFireCacheImpl getCache() { + public InternalCache getCache() { return this.cache; } @@ -1759,7 +1740,7 @@ public class DiskStoreImpl implements DiskStore { } } } // else - } catch (RegionDestroyedException ex) { + } catch (RegionDestroyedException ignore) { // Normally we flush before closing or destroying a region // but in some cases it is closed w/o flushing. // So just ignore it; see bug 41305. @@ -2050,18 +2031,8 @@ public class DiskStoreImpl implements DiskStore { return this.directories[this.infoFileDirIndex]; } - /** For Testing * */ - // void addToOplogSet(int oplogID, File opFile, DirectoryHolder dirHolder) { - // Oplog oplog = new Oplog(oplogID, this); - // oplog.addRecoveredFile(opFile, dirHolder); - // // @todo check callers to see if they need drf support - // this.oplogSet.add(oplog); - // } - - /** For Testing * */ /** * returns the size of the biggest directory available to the region - * */ public long getMaxDirSize() { return maxDirSize; @@ -2143,8 +2114,6 @@ public class DiskStoreImpl implements DiskStore { /** * Removes anything found in the async queue for the given region - * - * @param rvv */ private void clearAsyncQueue(LocalRegion region, boolean needsWriteLock, RegionVersionVector rvv) { @@ -2263,7 +2232,7 @@ public class DiskStoreImpl implements DiskStore { if (diskException.get() != null) { try { _testHandleDiskAccessException.await(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -2466,25 +2435,26 @@ public class DiskStoreImpl implements DiskStore { dr.setRegionClosed(true); } gotLock = true; - } catch (CancelException e) { + } catch (CancelException ignore) { synchronized (this.closeRegionGuard) { if (!dr.isRegionClosed()) { if (!closeDataOnly) { dr.setRegionClosed(true); } - // Asif: I am quite sure that it should also be Ok if instead + // I am quite sure that it should also be Ok if instead // while it is a If Check below. Because if acquireReadLock // thread - // has acquired thelock, it is bound to see the isRegionClose as + // has acquired the lock, it is bound to see the isRegionClose as // true - // and so will realse teh lock causing decrement to zeo , before + // and so will release the lock causing decrement to zero , before // releasing the closeRegionGuard. But still...not to take any // chance while (this.entryOpsCount.get() > 0) { try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(20000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignored) { // Exit without closing the region, do not know what else // can be done Thread.currentThread().interrupt(); @@ -2534,8 +2504,6 @@ public class DiskStoreImpl implements DiskStore { /** * stops the compactor outside the write lock. Once stopped then it proceeds to destroy the * current & old oplogs - * - * @param dr */ void beginDestroyRegion(LocalRegion region, DiskRegion dr) { if (dr.isBackup()) { @@ -2571,7 +2539,7 @@ public class DiskStoreImpl implements DiskStore { while (this.backgroundTasks.get() > 0) { try { this.backgroundTasks.wait(500L); - } catch (InterruptedException ex) { + } catch (InterruptedException ignore) { interrupted = true; } } @@ -2720,7 +2688,7 @@ public class DiskStoreImpl implements DiskStore { return null; } - return l.toArray(new CompactableOplog[0]); + return l.toArray(new CompactableOplog[l.size()]); } /** @@ -2745,7 +2713,6 @@ public class DiskStoreImpl implements DiskStore { * @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in * the restore script. * @return an array of Oplogs to be copied for an incremental backup. - * @throws IOException */ private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector, Map<File, File> baselineCopyMap) throws IOException { @@ -2796,11 +2763,9 @@ public class DiskStoreImpl implements DiskStore { } // Convert the filtered oplog list to an array - return oplogList.toArray(new Oplog[] {}); + return oplogList.toArray(new Oplog[oplogList.size()]); } - - /** * Get all of the oplogs */ @@ -3013,7 +2978,7 @@ public class DiskStoreImpl implements DiskStore { while (this.scheduled) { try { wait(); - } catch (InterruptedException ex) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -3114,30 +3079,13 @@ public class DiskStoreImpl implements DiskStore { if (dr.isRegionClosed()) { return; } - // // Stop the compactor if running, without taking lock. - // if (this.oplogCompactor != null) { - // try { - // this.oplogCompactor.stopCompactor(); - // } - // catch (CancelException ignore) { - // // Asif:To fix Bug 39380 , ignore the cache closed exception here. - // // allow it to call super .close so that it would be able to close - // the - // // oplogs - // // Though I do not think this exception will be thrown by - // // the stopCompactor. Still not taking chance and ignoring it - - // } - // } - // // if (!isSync()) { - // stopAsyncFlusher(true); // do this before writeLock - // // } + boolean gotLock = false; try { try { acquireWriteLock(dr); gotLock = true; - } catch (CancelException e) { + } catch (CancelException ignore) { // see workaround below. } @@ -3163,8 +3111,9 @@ public class DiskStoreImpl implements DiskStore { } boolean interrupted = Thread.interrupted(); try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(1000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -3175,7 +3124,7 @@ public class DiskStoreImpl implements DiskStore { if (this.entryOpsCount.get() > 0) { logger.warn(LocalizedMessage.create( LocalizedStrings.DisKRegion_OUTSTANDING_OPS_REMAIN_AFTER_0_SECONDS_FOR_DISK_REGION_1, - new Object[] {Integer.valueOf(loopCount), dr.getName()})); + new Object[] {loopCount, dr.getName()})); for (;;) { if (this.entryOpsCount.get() == 0) { @@ -3183,8 +3132,9 @@ public class DiskStoreImpl implements DiskStore { } boolean interrupted = Thread.interrupted(); try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(1000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -3233,7 +3183,7 @@ public class DiskStoreImpl implements DiskStore { dr.resetRVV(); dr.setRVVTrusted(false); dr.writeRVV(null, null); // just persist the empty rvv with trust=false - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException ignore) { // ignore a RegionDestroyedException at this stage } if (this.initFile != null && dr.isBackup()) { @@ -4111,11 +4061,6 @@ public class DiskStoreImpl implements DiskStore { * Start the backup process. This is the second step of the backup process. In this method, we * define the data we're backing up by copying the init file and rolling to the next file. After * this method returns operations can proceed as normal, except that we don't remove oplogs. - * - * @param targetDir - * @param baselineInspector - * @param restoreScript - * @throws IOException */ public void startBackup(File targetDir, BackupInspector baselineInspector, RestoreScript restoreScript) throws IOException { @@ -4130,7 +4075,7 @@ public class DiskStoreImpl implements DiskStore { } // Get an appropriate lock object for each set of oplogs. - Object childLock = childOplog.lock;; + Object childLock = childOplog.lock; // TODO - We really should move this lock into the disk store, but // until then we need to do this magic to make sure we're actually @@ -4201,9 +4146,6 @@ public class DiskStoreImpl implements DiskStore { /** * Copy the oplogs to the backup directory. This is the final step of the backup process. The * oplogs we copy are defined in the startBackup method. - * - * @param backupManager - * @throws IOException */ public void finishBackup(BackupManager backupManager) throws IOException { if (diskStoreBackup == null) { @@ -4312,17 +4254,17 @@ public class DiskStoreImpl implements DiskStore { props.setProperty(CACHE_XML_FILE, ""); DistributedSystem ds = DistributedSystem.connect(props); offlineDS = ds; - Cache c = org.apache.geode.cache.CacheFactory.create(ds); - offlineCache = c; - org.apache.geode.cache.DiskStoreFactory dsf = c.createDiskStoreFactory(); + InternalCache cache = (InternalCache) CacheFactory.create(ds); + offlineCache = cache; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); dsf.setDiskDirs(dsDirs); if (offlineCompacting && maxOplogSize != -1L) { dsf.setMaxOplogSize(maxOplogSize); } - DiskStoreImpl dsi = new DiskStoreImpl(c, dsName, + DiskStoreImpl dsi = new DiskStoreImpl(cache, dsName, ((DiskStoreFactoryImpl) dsf).getDiskStoreAttributes(), false, null, true, upgradeVersionOnly, offlineValidate, offlineCompacting, needsOplogs, offlineModify); - ((GemFireCacheImpl) c).addDiskStore(dsi); + cache.addDiskStore(dsi); return dsi; } @@ -4536,7 +4478,7 @@ public class DiskStoreImpl implements DiskStore { while (!isClosing() && currentAsyncValueRecoveryMap.containsKey(diskRegion.getId())) { try { currentAsyncValueRecoveryMap.wait(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } } @@ -4591,9 +4533,9 @@ public class DiskStoreImpl implements DiskStore { if (lastWriteTask != null) { try { lastWriteTask.get(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); - } catch (Exception e) { + } catch (Exception ignore) { // do nothing, an exception from the write task was already logged. } } @@ -4684,7 +4626,7 @@ public class DiskStoreImpl implements DiskStore { delayedWritePool.shutdown(); try { delayedWritePool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } }
http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java index 551f733..ac72361 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java @@ -14,6 +14,19 @@ */ package org.apache.geode.internal.cache; +import java.io.File; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.cache.DiskAccessException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -22,25 +35,16 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.*; public class DiskStoreMonitor { private static final Logger logger = LogService.getLogger(); private static final boolean DISABLE_MONITOR = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_DISABLE_MONITORING"); - // private static final boolean AUTO_RECONNECT = - // Boolean.getBoolean("gemfire.DISK_USAGE_ENABLE_AUTO_RECONNECT"); private static final int USAGE_CHECK_INTERVAL = Integer .getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_POLLING_INTERVAL_MILLIS", 10000); + private static final float LOG_WARNING_THRESHOLD_PCT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_LOG_WARNING_PERCENT", 99); @@ -67,7 +71,7 @@ public class DiskStoreMonitor { if (val < 0 || val > 100) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_WARNING_INVALID_0 - .toLocalizedString(Float.valueOf(val))); + .toLocalizedString(val)); } } @@ -80,17 +84,15 @@ public class DiskStoreMonitor { if (val < 0 || val > 100) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_CRITICAL_INVALID_0 - .toLocalizedString(Float.valueOf(val))); + .toLocalizedString(val)); } } private final ScheduledExecutorService exec; private final Map<DiskStoreImpl, Set<DirectoryHolderUsage>> disks; - private final LogUsage logDisk; - // // this is set when we go into auto_reconnect mode - // private volatile DirectoryHolderUsage criticalDisk; + private final LogUsage logDisk; volatile DiskStateAction _testAction; @@ -209,9 +211,9 @@ public class DiskStoreMonitor { private File getLogDir() { File log = null; - GemFireCacheImpl gci = GemFireCacheImpl.getInstance(); - if (gci != null) { - InternalDistributedSystem ds = gci.getInternalDistributedSystem(); + InternalCache internalCache = GemFireCacheImpl.getInstance(); + if (internalCache != null) { + InternalDistributedSystem ds = internalCache.getInternalDistributedSystem(); if (ds != null) { DistributionConfig conf = ds.getConfig(); if (conf != null) { @@ -230,7 +232,7 @@ public class DiskStoreMonitor { return log; } - abstract class DiskUsage { + abstract static class DiskUsage { private DiskState state; DiskUsage() { @@ -305,7 +307,7 @@ public class DiskStoreMonitor { protected abstract void handleStateChange(DiskState next, String pct); } - class LogUsage extends DiskUsage { + static class LogUsage extends DiskUsage { private final File dir; public LogUsage(File dir) { @@ -382,41 +384,12 @@ public class DiskStoreMonitor { logger.error(LogMarker.DISK_STORE_MONITOR, LocalizedMessage.create(LocalizedStrings.DiskStoreMonitor_DISK_CRITICAL, args)); - try { - // // prepare for restart - // if (AUTO_RECONNECT) { - // disk.getCache().saveCacheXmlForReconnect(); - // criticalDisk = this; - // } - } finally { - // pull the plug - disk.handleDiskAccessException(new DiskAccessException(msg, disk)); - } + // TODO: this is weird... + disk.handleDiskAccessException(new DiskAccessException(msg, disk)); break; } } - // private void performReconnect(String msg) { - // try { - // // don't try to reconnect before the cache is closed - // disk._testHandleDiskAccessException.await(); - // - // // now reconnect, clear out the var first so a close can interrupt the - // // reconnect - // criticalDisk = null; - // boolean restart = disk.getCache().getDistributedSystem().tryReconnect(true, msg, - // disk.getCache()); - // if (LogMarker.DISK_STORE_MONITOR || logger.isDebugEnabled()) { - // String pre = restart ? "Successfully" : "Unsuccessfully"; - // logger.info(LocalizedStrings.DEBUG, pre + " attempted to restart cache"); - // } - // } catch (InterruptedException e) { - // Thread.currentThread().interrupt(); - // } finally { - // close(); - // } - // } - @Override protected File dir() { return dir.getDir(); http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java index 36ad9ce..e22e1d9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java @@ -48,12 +48,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; -/** - * - */ public class DistTXCommitMessage extends TXMessage { private static final Logger logger = LogService.getLogger(); + protected ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null; /** for deserialization */ @@ -75,7 +73,7 @@ public class DistTXCommitMessage extends TXMessage { logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); final TXStateProxy txStateProxy = txMgr.getTXState(); TXCommitMessage cmsg = null; @@ -256,7 +254,7 @@ public class DistTXCommitMessage extends TXMessage { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); @@ -339,7 +337,7 @@ public class DistTXCommitMessage extends TXMessage { (DistTxCommitExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -349,7 +347,7 @@ public class DistTXCommitMessage extends TXMessage { (DistTxCommitExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -387,14 +385,12 @@ public class DistTXCommitMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -428,16 +424,13 @@ public class DistTXCommitMessage extends TXMessage { public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) { http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java index ffbc3ba..0f7aa72 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java @@ -53,12 +53,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; -/** - * - */ public final class DistTXPrecommitMessage extends TXMessage { private static final Logger logger = LogService.getLogger(); + ArrayList<DistTxEntryEvent> secondaryTransactionalOperations; /** for deserialization */ @@ -76,7 +74,7 @@ public final class DistTXPrecommitMessage extends TXMessage { @Override protected boolean operateOnTx(TXId txId, DistributionManager dm) throws RemoteOperationException { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); if (logger.isDebugEnabled()) { @@ -132,7 +130,7 @@ public final class DistTXPrecommitMessage extends TXMessage { } // Send Response : Send false if conflict - DistTxPrecommitResponse finalResponse = new DistTxPrecommitResponse(precommitSuccess, + DistTxPreCommitResponse finalResponse = new DistTxPreCommitResponse(precommitSuccess, new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values())); DistTXPrecommitReplyMessage.send(getSender(), getProcessorId(), finalResponse, getReplySender(dm)); @@ -176,7 +174,7 @@ public final class DistTXPrecommitMessage extends TXMessage { * This is the reply to a {@link DistTXPrecommitMessage}. */ public static final class DistTXPrecommitReplyMessage extends ReplyMessage { - private transient DistTxPrecommitResponse commitResponse; + private transient DistTxPreCommitResponse commitResponse; /** * Empty constructor to conform to DataSerializable interface @@ -187,7 +185,7 @@ public final class DistTXPrecommitMessage extends TXMessage { fromData(in); } - private DistTXPrecommitReplyMessage(int processorId, DistTxPrecommitResponse val) { + private DistTXPrecommitReplyMessage(int processorId, DistTxPreCommitResponse val) { setProcessorId(processorId); this.commitResponse = val; } @@ -209,7 +207,7 @@ public final class DistTXPrecommitMessage extends TXMessage { * @param replySender distribution manager used to send the reply */ public static void send(InternalDistributedMember recipient, int processorId, - DistTxPrecommitResponse val, ReplySender replySender) throws RemoteOperationException { + DistTxPreCommitResponse val, ReplySender replySender) throws RemoteOperationException { Assert.assertTrue(recipient != null, "DistTXPhaseOneCommitReplyMessage NULL reply message"); DistTXPrecommitReplyMessage m = new DistTXPrecommitReplyMessage(processorId, val); m.setRecipient(recipient); @@ -253,18 +251,18 @@ public final class DistTXPrecommitMessage extends TXMessage { @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { super.fromData(in); - this.commitResponse = (DistTxPrecommitResponse) DataSerializer.readObject(in); + this.commitResponse = (DistTxPreCommitResponse) DataSerializer.readObject(in); } @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("DistTXPhaseOneCommitReplyMessage").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); } - public DistTxPrecommitResponse getCommitResponse() { + public DistTxPreCommitResponse getCommitResponse() { return commitResponse; } } @@ -279,7 +277,7 @@ public final class DistTXPrecommitMessage extends TXMessage { */ public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 { private HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap; - private Map<DistributedMember, DistTxPrecommitResponse> commitResponseMap; + private Map<DistributedMember, DistTxPreCommitResponse> commitResponseMap; private transient TXId txIdent = null; public DistTxPrecommitReplyProcessor(TXId txUniqId, DM dm, Set initMembers, @@ -288,7 +286,7 @@ public final class DistTXPrecommitMessage extends TXMessage { this.msgMap = msgMap; // [DISTTX] TODO Do we need synchronised map? this.commitResponseMap = - Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPrecommitResponse>()); + Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPreCommitResponse>()); this.txIdent = txUniqId; } @@ -340,7 +338,7 @@ public final class DistTXPrecommitMessage extends TXMessage { (DistTxPrecommitExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -350,11 +348,11 @@ public final class DistTXPrecommitMessage extends TXMessage { (DistTxPrecommitExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } - public Map<DistributedMember, DistTxPrecommitResponse> getCommitResponseMap() { + public Map<DistributedMember, DistTxPreCommitResponse> getCommitResponseMap() { return commitResponseMap; } } @@ -388,14 +386,12 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -429,16 +425,13 @@ public final class DistTXPrecommitMessage extends TXMessage { public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) { @@ -465,14 +458,14 @@ public final class DistTXPrecommitMessage extends TXMessage { } } - public static final class DistTxPrecommitResponse implements DataSerializableFixedID { + public static final class DistTxPreCommitResponse implements DataSerializableFixedID { private transient Boolean commitState; private transient ArrayList<ArrayList<DistTxThinEntryState>> distTxEventList; // Default constructor for serialisation - public DistTxPrecommitResponse() {} + public DistTxPreCommitResponse() {} - public DistTxPrecommitResponse(boolean precommitSuccess, + public DistTxPreCommitResponse(boolean precommitSuccess, ArrayList<ArrayList<DistTxThinEntryState>> eventList) { this.commitState = precommitSuccess; this.distTxEventList = eventList; http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java index bfe302a..d4f5943 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java @@ -75,7 +75,7 @@ public final class DistTXRollbackMessage extends TXMessage { logger.debug("Dist TX: Rollback: {}", txId); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); final TXStateProxy txState = txMgr.getTXState(); boolean rollbackSuccessful = false; @@ -87,10 +87,6 @@ public final class DistTXRollbackMessage extends TXMessage { "DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}", txId); } - // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId); - // if (txMgr.isExceptionToken(cmsg)) { - // throw txMgr.getExceptionForToken(cmsg, txId); - // } } else if (txState != null) { // [DISTTX] TODO - Handle scenarios of no txState // if no TXState was created (e.g. due to only getEntry/size operations @@ -219,7 +215,7 @@ public final class DistTXRollbackMessage extends TXMessage { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("DistTXRollbackReplyMessage ").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); @@ -232,7 +228,6 @@ public final class DistTXRollbackMessage extends TXMessage { /** * A processor to capture the value returned by {@link DistTXRollbackReplyMessage} - * */ public static class DistTXRollbackResponse extends RemoteOperationResponse { private volatile Boolean rollbackState; @@ -275,9 +270,6 @@ public final class DistTXRollbackMessage extends TXMessage { final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing"; logger.debug(msg, e); throw e; - } catch (TransactionDataNotColocatedException e) { - // Throw this up to user! - throw e; } return rollbackState; } @@ -354,7 +346,7 @@ public final class DistTXRollbackMessage extends TXMessage { (DistTxRollbackExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -364,7 +356,7 @@ public final class DistTXRollbackMessage extends TXMessage { (DistTxRollbackExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -402,14 +394,12 @@ public final class DistTXRollbackMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -443,16 +433,13 @@ public final class DistTXRollbackMessage extends TXMessage { public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) { http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java index 68bde4e..50f36c2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java @@ -15,10 +15,8 @@ package org.apache.geode.internal.cache; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.Map.Entry; @@ -32,10 +30,9 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse; -import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData; -import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData; +import org.apache.geode.internal.cache.DistTXPrecommitMessage.DistTxPreCommitResponse; import org.apache.geode.internal.cache.TXEntryState.DistTxThinEntryState; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.cache.tx.DistClientTXStateStub; import org.apache.geode.internal.cache.tx.DistTxEntryEvent; @@ -50,8 +47,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { */ protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals = new HashMap<>(); + private HashMap<LocalRegion, DistributedMember> rrTargets; + private Set<DistributedMember> txRemoteParticpants = null; // other than local + protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null; public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id, @@ -132,8 +132,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { * those */ private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() { - final GemFireCacheImpl cache = - GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps"); + final InternalCache cache = GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps"); InternalDistributedMember currentNode = cache.getInternalDistributedSystem().getDistributedMember(); @@ -143,7 +142,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { DistributedMember originalTarget = e.getKey(); DistTXCoordinatorInterface distPeerTxStateStub = e.getValue(); - ArrayList<DistTxEntryEvent> primaryTxOps = + Iterable<DistTxEntryEvent> primaryTxOps = distPeerTxStateStub.getPrimaryTransactionalOperations(); for (DistTxEntryEvent dtop : primaryTxOps) { LocalRegion lr = dtop.getRegion(); @@ -155,8 +154,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { allNodes.remove(originalTarget); otherNodes = allNodes; } else if (lr instanceof DistributedRegion) { - otherNodes = - ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates(); + otherNodes = ((CacheDistributionAdvisee) lr).getCacheDistributionAdvisor() + .adviseInitializedReplicates(); otherNodes.remove(originalTarget); } @@ -196,7 +195,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { } boolean finalResult = false; - final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Rollback"); + final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Rollback"); final DM dm = cache.getDistributionManager(); try { // Create Tx Participants @@ -319,7 +318,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { if (r instanceof PartitionedRegion) { target = getOwnerForKey(r, key); } else if (r instanceof BucketRegion) { - target = ((BucketRegion) r).getBucketAdvisor().getPrimary(); + target = ((Bucket) r).getBucketAdvisor().getPrimary(); // target = r.getMyId(); } else { // replicated region target = getRRTarget(key, r); @@ -390,7 +389,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName())); } - target2realDeals.put(target, (DistPeerTXStateStub) realDeal); + target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal); if (logger.isDebugEnabled()) { logger.debug( "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = " @@ -438,7 +437,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { private boolean doPrecommit() { boolean finalResult = true; - final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Precommit"); + final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Precommit"); final DM dm = cache.getDistributionManager(); // Create Tx Participants @@ -468,7 +467,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { } else if (lr instanceof PartitionedRegion || lr instanceof BucketRegion) { final PartitionedRegion pr; if (lr instanceof BucketRegion) { - pr = ((BucketRegion) lr).getPartitionedRegion(); + pr = ((Bucket) lr).getPartitionedRegion(); } else { pr = (PartitionedRegion) lr; } @@ -528,8 +527,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { new TreeMap<String, ArrayList<DistTxThinEntryState>>(); ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null; if (localResult) { - localResult = ((DistTXStateOnCoordinator) localTXState) - .populateDistTxEntryStateList(entryStateSortedMap); + localResult = + ((DistTXState) localTXState).populateDistTxEntryStateList(entryStateSortedMap); if (localResult) { entryEventList = new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values()); @@ -572,11 +571,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { // [DISTTX} TODO Handle stats // dm.getStats().incCommitWaits(); - Map<DistributedMember, DistTxPrecommitResponse> remoteResults = + Map<DistributedMember, DistTxPreCommitResponse> remoteResults = processor.getCommitResponseMap(); - for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) { + for (Entry<DistributedMember, DistTxPreCommitResponse> e : remoteResults.entrySet()) { DistributedMember target = e.getKey(); - DistTxPrecommitResponse remoteResponse = e.getValue(); + DistTxPreCommitResponse remoteResponse = e.getValue(); ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = remoteResponse.getDistTxEntryEventList(); populateEntryEventMap(target, entryEventList, sortedRegionName); @@ -665,7 +664,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { */ private boolean doCommit() { boolean finalResult = true; - final GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying Dist TX Commit"); + final InternalCache cache = GemFireCacheImpl.getExisting("Applying Dist TX Commit"); final DM dm = cache.getDistributionManager(); // Create Tx Participants @@ -716,7 +715,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { localTXState.getClass().getSimpleName())); } populateEntryEventList(dm.getId(), entryEventList, sortedRegionName); - ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList); + ((DistTXState) localTXState).setDistTxEntryStates(entryEventList); localTXState.commit(); TXCommitMessage localResultMsg = localTXState.getCommitMessage(); if (logger.isDebugEnabled()) { @@ -821,7 +820,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { Object key = putallOp.putAllData[i].key; int bucketId = putallOp.putAllData[i].getBucketId(); - DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId);; + DistributedPutAllOperation putAllForBucket = bucketToPutallMap.get(bucketId); if (putAllForBucket == null) { // TODO DISTTX: event is never released EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region, @@ -982,7 +981,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) { DistributedMember m = r.getOwnerForKey(key); if (m == null) { - GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey"); + InternalCache cache = GemFireCacheImpl.getExisting("getOwnerForKey"); m = cache.getDistributedSystem().getDistributedMember(); } return m; http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java index 0a9ccd8..7ba7d0c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; @@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.EntryNotFoundException; +import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.query.internal.cq.CqService; @@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.Assert; import org.apache.geode.internal.CopyOnWriteHashSet; import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; -import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage; import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter; import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo; import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage; +import org.apache.geode.internal.cache.partitioned.Bucket; import org.apache.geode.internal.cache.partitioned.PartitionMessage; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.cache.tier.MessageType; @@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.offheap.Releasable; import org.apache.geode.internal.offheap.StoredObject; import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Unretained; import org.apache.geode.internal.sequencelog.EntryLogger; import org.apache.geode.internal.util.DelayedAction; -/** - * - */ public abstract class DistributedCacheOperation { private static final Logger logger = LogService.getLogger(); public static double LOSS_SIMULATION_RATIO = 0; // test hook + public static Random LOSS_SIMULATION_GENERATOR; public static long SLOW_DISTRIBUTION_MS = 0; // test hook // constants used in subclasses and distribution messages // should use enum in source level 1.5+ + /** * Deserialization policy: do not deserialize (for byte array, null or cases where the value * should stay serialized) @@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation { } - public final static byte DESERIALIZATION_POLICY_NUMBITS = + public static final byte DESERIALIZATION_POLICY_NUMBITS = DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY); public static final short DESERIALIZATION_POLICY_END = (short) (1 << DESERIALIZATION_POLICY_NUMBITS); + public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1); public static boolean testSendingOldValues; @@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation { try { _distribute(); } catch (InvalidVersionException e) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e); } @@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation { DistributedRegion region = getRegion(); if (viewVersion != -1) { region.getDistributionAdvisor().endOperation(viewVersion); - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}", viewVersion); } @@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation { if (SLOW_DISTRIBUTION_MS > 0) { // test hook try { Thread.sleep(SLOW_DISTRIBUTION_MS); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } SLOW_DISTRIBUTION_MS = 0; @@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation { } // some members requiring old value are also in the cache op recipients set - Set needsOldValueInCacheOp = Collections.EMPTY_SET; + Set needsOldValueInCacheOp = Collections.emptySet(); // set client routing information into the event boolean routingComputed = false; FilterRoutingInfo filterRouting = null; // recipients that will get a cacheop msg and also a PR message - Set twoMessages = Collections.EMPTY_SET; + Set twoMessages = Collections.emptySet(); if (region.isUsedForPartitionedRegionBucket()) { - twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages(); + twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages(); routingComputed = true; filterRouting = getRecipientFilterRouting(recipients); if (filterRouting != null) { @@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation { // some members need PR notification of the change for client/wan // notification - Set adjunctRecipients = Collections.EMPTY_SET; + Set adjunctRecipients = Collections.emptySet(); // Partitioned region listener notification messages piggyback on this // operation's replyprocessor and need to be sent at the same time as @@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation { recipients.removeAll(needsOldValueInCacheOp); } - Set cachelessNodes = Collections.EMPTY_SET; - Set adviseCacheServers = Collections.EMPTY_SET; - Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = - new HashSet<InternalDistributedMember>(); + Set cachelessNodes = Collections.emptySet(); + Set adviseCacheServers; + Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>(); if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) { cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys(); if (!cachelessNodes.isEmpty()) { List list = new ArrayList(cachelessNodes); for (Object member : cachelessNodes) { - if (!recipients.contains(member)) { + if (!recipients.contains(member) || adjunctRecipients.contains(member)) { // Don't include those originally excluded. list.remove(member); - } else if (adjunctRecipients.contains(member)) { - list.remove(member); } } cachelessNodes.clear(); @@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation { if (!reliableOp || region.isNoDistributionOk()) { // nothing needs be done in this case } else { - region.handleReliableDistribution(Collections.EMPTY_SET); + region.handleReliableDistribution(Collections.emptySet()); } - /** compute local client routing before waiting for an ack only for a bucket */ + // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); this.event.setLocalFilterInfo(filterInfo); @@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation { } else { boolean directAck = false; boolean useMulticast = region.getMulticastEnabled() - && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();; + && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast(); boolean shouldAck = shouldAck(); if (shouldAck) { @@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation { recipients); } waitForMembers.removeAll(recipients); - recipients = Collections.EMPTY_SET; + recipients = Collections.emptySet(); } } if (reliableOp) { @@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation { } adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients); - adviseCacheServers = ((BucketRegion) region).getPartitionedRegion() + adviseCacheServers = ((Bucket) region).getPartitionedRegion() .getCacheDistributionAdvisor().adviseCacheServers(); adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers); @@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation { } } - /** compute local client routing before waiting for an ack only for a bucket */ + // compute local client routing before waiting for an ack only for a bucket if (region.isUsedForPartitionedRegionBucket()) { FilterInfo filterInfo = getLocalFilterRouting(filterRouting); event.setLocalFilterInfo(filterInfo); @@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation { } } - /** * Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results * key caching. the destroy event keys are marked as destroyed instead of removing them, this is @@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation { continue; } - CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion() + CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion() .getCacheDistributionAdvisor().getProfile(m); if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile() @@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation { continue; } - for (Object value : cf.filterProfile.getCqMap().values()) { ServerCQ cq = (ServerCQ) value; @@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation { Long cqID = e.getKey(); // For the CQs satisfying the event with destroy CQEvent, remove // the entry form CQ cache. - if (cq.getFilterID() == cqID - && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) { - cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true); + if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) { + cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true); } } } } } - /** * Get the adjunct receivers for a partitioned region operation * @@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation { /** * perform any operation-specific initialization on the given reply processor - * - * @param p - * @param msg */ protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) { // nothing to do here - see UpdateMessage @@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation { } } - /** - * @param closedMembers - */ private void handleClosedMembers(Set<InternalDistributedMember> closedMembers, Map<InternalDistributedMember, PersistentMemberID> persistentIds) { if (persistentIds == null) { @@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation { return null; } CacheDistributionAdvisor advisor; - // if (region.isUsedForPartitionedRegionBucket()) { - advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor(); - // } else { - // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor(); - // } + advisor = region.getPartitionedRegion().getCacheDistributionAdvisor(); return advisor.adviseFilterRouting(this.event, cacheOpRecipients); } @@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation { protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1); protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1); - private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400; public boolean needsRouting; @@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation { return this.op; } - /** sets the concurrency versioning tag for this message */ public void setVersionTag(VersionTag tag) { this.versionTag = tag; @@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation { /** * process a reply * - * @param reply - * @param processor * @return true if the reply-processor should continue to process this response */ boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) { @@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation { * @param event the entry event that contains the old value */ public void appendOldValueToMessage(EntryEventImpl event) { - { - @Unretained - Object val = event.getRawOldValue(); - if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1 - || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) { - return; - } + @Unretained + Object val = event.getRawOldValue(); + if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1 + || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) { + return; } event.exportOldValue(this); } @@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation { protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) { Assert.assertTrue(this.regionPath != null, "regionPath was null"); - GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem()); + InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem()); return gfc.getRegionByPathForProcessing(this.regionPath); } @@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation { final LocalRegion lclRgn = getLocalRegionForProcessing(dm); sendReply = false; basicProcess(dm, lclRgn); - } catch (CancelException e) { + } catch (CancelException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); @@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation { // region if (!rgn.isEventTrackerInitialized() && (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) { - if (logger.isDebugEnabled()) { + if (logger.isTraceEnabled()) { logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event"); } return; @@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation { sendReply = operateOnRegion(event, dm) && sendReply; } finally { if (event instanceof EntryEventImpl) { - ((EntryEventImpl) event).release(); + ((Releasable) event).release(); } } - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Region destroyed: nothing to do", this); } - } catch (CancelException e) { + } catch (CancelException ignore) { this.closed = true; if (logger.isDebugEnabled()) { logger.debug("{} Cancelled: nothing to do", this); @@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation { if (!lclRgn.isDestroyed()) { logger.error("Got disk access exception, expected region to be destroyed", e); } - } catch (EntryNotFoundException e) { + } catch (EntryNotFoundException ignore) { this.appliedOperation = true; if (logger.isDebugEnabled()) { logger.debug("{} Entry not found, nothing to do", this); @@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation { if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871 // distributed-no-ack message. Don't respond } else { - ReplyException exception = rex; - ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false, + ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false, isInternal()); } } @@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation { * When an event is discarded because of an attempt to overwrite a more recent change we still * need to deliver that event to clients. Clients can then perform their own concurrency checks * on the event. - * - * @param rgn - * @param ev */ protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) { if (logger.isDebugEnabled()) { @@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation { rgn.notifyBridgeClients(ev); } - // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys, - // String path) { - // return LocalRegion.getRegionFromPath(sys, path); - // } - protected abstract InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException; @@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation { @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { - // super.fromData(in); short bits = in.readShort(); short extBits = in.readShort(); this.flags = bits; @@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation { @Override public void toData(DataOutput out) throws IOException { - // super.toData(out); - short bits = 0; short extendedBits = 0; bits = computeCompressedShort(bits); @@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation { static class CacheOperationReplyProcessor extends DirectReplyProcessor { public CacheOperationMessage msg; - public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = - new CopyOnWriteHashSet<InternalDistributedMember>(); + public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>(); public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) { super(system, initMembers);