http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java index d0dd084..bcc998f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java @@ -50,10 +50,6 @@ import org.apache.geode.internal.cache.execute.MultiRegionFunctionContextImpl; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; -/** - * - * - */ public class MemberFunctionStreamingMessage extends DistributionMessage implements TransactionMessage, MessageWithReply { @@ -72,6 +68,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage private int processorId; private int txUniqId = TXManagerImpl.NOTX; + private InternalDistributedMember txMemberId = null; private boolean isFnSerializationReqd; @@ -80,8 +77,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage private boolean isReExecute; - // private final Object lastResultLock = new Object(); - private static final short IS_REEXECUTE = UNRESERVED_FLAGS_START; public MemberFunctionStreamingMessage() {} @@ -124,7 +119,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage if (this.txUniqId == TXManagerImpl.NOTX) { return null; } else { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { // ignore and return, we are shutting down! return null; @@ -134,9 +129,9 @@ public class MemberFunctionStreamingMessage extends DistributionMessage } } - private void cleanupTransasction(TXStateProxy tx) { + private void cleanupTransaction(TXStateProxy tx) { if (this.txUniqId != TXManagerImpl.NOTX) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache == null) { // ignore and return, we are shutting down! return; @@ -167,7 +162,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage ResultSender resultSender = new MemberFunctionResultSender(dm, this, this.functionObject); Set<Region> regions = new HashSet<Region>(); if (this.regionPathSet != null) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); for (String regionPath : this.regionPathSet) { if (checkCacheClosing(dm) || checkDSClosing(dm)) { thr = @@ -181,7 +176,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage FunctionContextImpl context = new MultiRegionFunctionContextImpl(this.functionObject.getId(), this.args, resultSender, regions, isReExecute); - long start = stats.startTime(); stats.startFunctionExecution(this.functionObject.hasResult()); if (logger.isDebugEnabled()) { @@ -235,7 +229,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage SystemFailure.checkFailure(); thr = t; } finally { - cleanupTransasction(tx); + cleanupTransaction(tx); if (thr != null) { rex = new ReplyException(thr); replyWithException(dm, rex); @@ -268,7 +262,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage if ((flags & HAS_TX_ID) != 0) this.txUniqId = in.readInt(); if ((flags & HAS_TX_MEMBERID) != 0) { - this.txMemberId = (InternalDistributedMember) DataSerializer.readObject(in); + this.txMemberId = DataSerializer.readObject(in); } Object object = DataSerializer.readObject(in); @@ -358,8 +352,8 @@ public class MemberFunctionStreamingMessage extends DistributionMessage /** * check to see if the cache is closing */ - final public boolean checkCacheClosing(DistributionManager dm) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + private boolean checkCacheClosing(DistributionManager dm) { + InternalCache cache = GemFireCacheImpl.getInstance(); return (cache == null || cache.getCancelCriterion().isCancelInProgress()); } @@ -368,25 +362,15 @@ public class MemberFunctionStreamingMessage extends DistributionMessage * * @return true if the distributed system is closing */ - final public boolean checkDSClosing(DistributionManager dm) { + private boolean checkDSClosing(DistributionManager dm) { InternalDistributedSystem ds = dm.getSystem(); return (ds == null || ds.isDisconnecting()); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TransactionMessage#canStartRemoteTransaction() - */ public boolean canStartRemoteTransaction() { return true; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.cache.TransactionMessage#getTXUniqId() - */ public int getTXUniqId() { return this.txUniqId; } @@ -400,7 +384,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage } public InternalDistributedMember getTXOriginatorClient() { - // TODO Auto-generated method stub return null; }
http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java index faec43a..bfcf6ff 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java @@ -349,13 +349,6 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp { .toLocalizedString()); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.internal.cache.RegionEntry#getSerializedValueOnDisk(org.apache.geode.internal. - * cache.LocalRegion) - */ public Object getSerializedValueOnDisk(LocalRegion localRegion) { throw new UnsupportedOperationException( LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY http://git-wip-us.apache.org/repos/asf/geode/blob/640b9f03/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 7f84393..f7afecc 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -14,12 +14,49 @@ */ package org.apache.geode.internal.cache; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.SyncFailedException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.Long2ObjectMap; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.SerializationException; @@ -78,52 +115,14 @@ import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.internal.util.IOUtils; import org.apache.geode.internal.util.TransformUtils; import org.apache.geode.pdx.internal.PdxWriterImpl; -import org.apache.logging.log4j.Logger; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.SyncFailedException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; /** * Implements an operation log to write to disk. As of prPersistSprint2 this file only supports * persistent regions. For overflow only regions see {@link OverflowOplog}. * - * * @since GemFire 5.1 */ - -public final class Oplog implements CompactableOplog, Flushable { +public class Oplog implements CompactableOplog, Flushable { private static final Logger logger = LogService.getLogger(); /** Extension of the oplog file * */ @@ -141,8 +140,6 @@ public final class Oplog implements CompactableOplog, Flushable { private final OplogFile drf = new OplogFile(); private final KRFile krf = new KRFile(); - /** preallocated space available for writing to* */ - // volatile private long opLogSpace = 0L; /** The stats for this store */ private final DiskStoreStats stats; @@ -190,6 +187,7 @@ public final class Oplog implements CompactableOplog, Flushable { * The HighWaterMark of recentValues. */ private final AtomicLong totalCount = new AtomicLong(0); + /** * The number of records in this oplog that contain the most recent value of the entry. */ @@ -209,13 +207,13 @@ public final class Oplog implements CompactableOplog, Flushable { * Set to true after the first drf recovery. */ private boolean haveRecoveredDrf = true; + /** * Set to true after the first crf recovery. */ private boolean haveRecoveredCrf = true; - private OpState opState; - /** OPCODES - byte appended before being written to disk* */ + private OpState opState; /** * Written to CRF, and DRF. @@ -239,7 +237,9 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_NEW_ENTRY_BASE_ID = 63; + static final int OPLOG_NEW_ENTRY_BASE_REC_SIZE = 1 + 8 + 1; + /** * Written to CRF. The OplogEntryId is +1 the previous new_entry OplogEntryId. Byte Format: 1: * userBits RegionId 4: valueLength (optional depending on bits) valueLength: value bytes @@ -258,6 +258,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_1ID = 65; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 2 bytes. Byte Format: 1: userBits 2: OplogEntryId RegionId 4: @@ -297,6 +298,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_5ID = 69; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 6 bytes. Byte Format: 1: userBits 6: OplogEntryId RegionId 4: @@ -306,6 +308,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_6ID = 70; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 7 bytes. Byte Format: 1: userBits 7: OplogEntryId RegionId 4: @@ -315,6 +318,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_7ID = 71; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 8 bytes. Byte Format: 1: userBits 8: OplogEntryId RegionId 4: @@ -334,6 +338,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_WITH_KEY_1ID = 73; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 2 bytes. Byte Format: 1: userBits 2: OplogEntryId RegionId 4: @@ -373,6 +378,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_WITH_KEY_5ID = 77; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 6 bytes. Byte Format: 1: userBits 6: OplogEntryId RegionId 4: @@ -382,6 +388,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_WITH_KEY_6ID = 78; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 7 bytes. Byte Format: 1: userBits 7: OplogEntryId RegionId 4: @@ -391,6 +398,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_MOD_ENTRY_WITH_KEY_7ID = 79; + /** * Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed * difference is encoded in 8 bytes. Byte Format: 1: userBits 8: OplogEntryId RegionId 4: @@ -439,6 +447,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_DEL_ENTRY_5ID = 85; + /** * Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed * difference is encoded in 6 bytes. Byte Format: 6: OplogEntryId 1: EndOfRecord @@ -446,6 +455,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_DEL_ENTRY_6ID = 86; + /** * Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed * difference is encoded in 7 bytes. Byte Format: 7: OplogEntryId 1: EndOfRecord @@ -453,6 +463,7 @@ public final class Oplog implements CompactableOplog, Flushable { * @since GemFire prPersistSprint1 */ private static final byte OPLOG_DEL_ENTRY_7ID = 87; + /** * Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed * difference is encoded in 8 bytes. Byte Format: 8: OplogEntryId 1: EndOfRecord @@ -488,6 +499,7 @@ public final class Oplog implements CompactableOplog, Flushable { * such as 7.0.0.beta EndOfRecord */ private static final byte OPLOG_GEMFIRE_VERSION = 91; + static final int OPLOG_GEMFIRE_VERSION_REC_SIZE = 1 + 3 + 1; /** @@ -499,14 +511,14 @@ public final class Oplog implements CompactableOplog, Flushable { */ static final byte OPLOG_MAGIC_SEQ_ID = 92; - public static enum OPLOG_TYPE { + public enum OPLOG_TYPE { CRF(new byte[] {0x47, 0x46, 0x43, 0x52, 0x46, 0x31}), // GFCRF1 DRF(new byte[] {0x47, 0x46, 0x44, 0x52, 0x46, 0x31}), // GFDRF1 IRF(new byte[] {0x47, 0x46, 0x49, 0x52, 0x46, 0x31}), // GFIRF1 KRF(new byte[] {0x47, 0x46, 0x4b, 0x52, 0x46, 0x31}), // GFKRF1 IF(new byte[] {0x47, 0x46, 0x49, 0x46, 0x30, 0x31}); // GFIF01 - private byte[] bytes; + private final byte[] bytes; OPLOG_TYPE(byte[] byteSeq) { this.bytes = byteSeq; @@ -527,10 +539,10 @@ public final class Oplog implements CompactableOplog, Flushable { private final boolean compactOplogs; /** - * Asif: This object is used to correctly identify the OpLog size so as to cause a switch of - * oplogs + * This object is used to correctly identify the OpLog size so as to cause a switch of oplogs */ final Object lock = new Object(); + final ByteBuffer[] bbArray = new ByteBuffer[2]; private boolean lockedForKRFcreate = false; @@ -542,16 +554,7 @@ public final class Oplog implements CompactableOplog, Flushable { private boolean doneAppending = false; /** - * Extra bytes to be skipped before reading value bytes. Value is currently 6 : 1 byte for opcode, - * 1 byte for userbits and 4 bytes for value length. - */ - private static final long SKIP_BYTES = 6; - - private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); - - // ///////////////////// Constructors //////////////////////// - /** - * Creates new <code>Oplog</code> for the given region. + * Creates new {@code Oplog} for the given region. * * @param oplogId int identifying the new oplog * @param dirHolder The directory in which to create new Oplog @@ -620,8 +623,8 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: A copy constructor used for creating a new oplog based on the previous Oplog. This - * constructor is invoked only from the function switchOplog + * A copy constructor used for creating a new oplog based on the previous Oplog. This constructor + * is invoked only from the function switchOplog * * @param oplogId integer identifying the new oplog * @param dirHolder The directory in which to create new Oplog @@ -773,7 +776,7 @@ public final class Oplog implements CompactableOplog, Flushable { this.dirHolder.incrementTotalOplogSize(getOpStateSize()); } - public final Version currentRecoveredGFVersion() { + public Version currentRecoveredGFVersion() { return this.gfversion; } @@ -790,7 +793,6 @@ public final class Oplog implements CompactableOplog, Flushable { * @param olf the oplog to write to * @param diskRegions the set of disk regions we should write the RVV of * @param writeGCRVV true to write write the GC RVV - * @throws IOException */ private void writeRVVRecord(OplogFile olf, Map<Long, AbstractDiskRegion> diskRegions, boolean writeGCRVV) throws IOException { @@ -835,9 +837,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * This constructor will get invoked only in case of persistent region when it is recovering an * oplog. - * - * @param oplogId - * @param parent */ Oplog(long oplogId, PersistentOplogSet parent) { // @todo have the crf and drf use different directories. @@ -850,8 +849,7 @@ public final class Oplog implements CompactableOplog, Flushable { this.parent = parent.getParent(); this.oplogSet = parent; this.opState = new OpState(); - long maxOplogSizeParam = getParent().getMaxOplogSizeInBytes(); - this.maxOplogSize = maxOplogSizeParam; + this.maxOplogSize = getParent().getMaxOplogSizeInBytes(); setMaxCrfDrfSize(); this.stats = getParent().getStats(); this.compactOplogs = getParent().getAutoCompact(); @@ -1084,8 +1082,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * Creates the crf oplog file - * - * @throws IOException */ private void createCrf(OplogFile prevOlf) throws IOException { File f = new File(this.diskFile.getPath() + CRF_FILE_EXT); @@ -1121,14 +1117,12 @@ public final class Oplog implements CompactableOplog, Flushable { prevOlf.writeBuf = null; return result; } else { - return ByteBuffer.allocateDirect(Integer.getInteger("WRITE_BUF_SIZE", 32768).intValue()); + return ByteBuffer.allocateDirect(Integer.getInteger("WRITE_BUF_SIZE", 32768)); } } /** * Creates the drf oplog file - * - * @throws IOException */ private void createDrf(OplogFile prevOlf) throws IOException { File f = new File(this.diskFile.getPath() + DRF_FILE_EXT); @@ -1150,27 +1144,18 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Returns the <code>DiskStoreStats</code> for this oplog + * Returns the {@code DiskStoreStats} for this oplog */ public DiskStoreStats getStats() { return this.stats; } /** - * Flushes any pending writes to disk. - * - * public final void flush() { forceFlush(); } - */ - - /** * Test Method to be used only for testing purposes. Gets the underlying File object for the Oplog * . Oplog class uses this File object to obtain the RandomAccessFile object. Before returning the * File object , the dat present in the buffers of the RandomAccessFile object is flushed. * Otherwise, for windows the actual file length does not match with the File size obtained from * the File object - * - * @throws IOException - * @throws SyncFailedException */ File getOplogFile() throws SyncFailedException, IOException { // @todo check callers for drf @@ -1271,7 +1256,7 @@ public final class Oplog implements CompactableOplog, Flushable { * present. @param faultingIn @param bitOnly boolean indicating whether to extract just the * UserBit or UserBit with value @return BytesAndBits object wrapping the value & user bit */ - public final BytesAndBits getBytesAndBits(DiskRegionView dr, DiskId id, boolean faultingIn, + public BytesAndBits getBytesAndBits(DiskRegionView dr, DiskId id, boolean faultingIn, boolean bitOnly) { Oplog retryOplog = null; long offset = 0; @@ -1292,22 +1277,18 @@ public final class Oplog implements CompactableOplog, Flushable { BytesAndBits bb = null; long start = this.stats.startRead(); - // Asif: If the offset happens to be -1, still it is possible that + // If the offset happens to be -1, still it is possible that // the data is present in the current oplog file. if (offset == -1) { - // Asif: Since it is given that a get operation has alreadty + // Since it is given that a get operation has alreadty // taken a // lock on an entry , no put operation could have modified the // oplog ID // there fore synchronization is not needed - // synchronized (id) { - // if (id.getOplogId() == this.oplogId) { offset = id.getOffsetInOplog(); - // } - // } } - // Asif :If the current OpLog is not destroyed ( its opLogRaf file + // If the current OpLog is not destroyed ( its opLogRaf file // is still open) we can retrieve the value from this oplog. try { bb = basicGet(dr, offset, bitOnly, id.getValueLength(), id.getUserBits()); @@ -1321,7 +1302,7 @@ public final class Oplog implements CompactableOplog, Flushable { if (bb == null) { throw new EntryDestroyedException( LocalizedStrings.Oplog_NO_VALUE_WAS_FOUND_FOR_ENTRY_WITH_DISK_ID_0_ON_A_REGION_WITH_SYNCHRONOUS_WRITING_SET_TO_1 - .toLocalizedString(new Object[] {id, Boolean.valueOf(dr.isSync())})); + .toLocalizedString(new Object[] {id, dr.isSync()})); } if (bitOnly) { dr.endRead(start, this.stats.endRead(start, 1), 1); @@ -1339,17 +1320,14 @@ public final class Oplog implements CompactableOplog, Flushable { * HTree with the oplog being destroyed * * @param id A DiskId object for which the value on disk will be fetched - * */ - public final BytesAndBits getNoBuffer(DiskRegion dr, DiskId id) { + public BytesAndBits getNoBuffer(DiskRegion dr, DiskId id) { if (logger.isDebugEnabled()) { logger.debug("Oplog::getNoBuffer:Before invoking Oplog.basicGet for DiskID ={}", id); } try { - BytesAndBits bb = - basicGet(dr, id.getOffsetInOplog(), false, id.getValueLength(), id.getUserBits()); - return bb; + return basicGet(dr, id.getOffsetInOplog(), false, id.getValueLength(), id.getUserBits()); } catch (DiskAccessException dae) { logger.error(LocalizedMessage.create( LocalizedStrings.Oplog_OPLOGGETNOBUFFEREXCEPTION_IN_RETRIEVING_VALUE_FROM_DISK_FOR_DISKID_0, @@ -1612,32 +1590,24 @@ public final class Oplog implements CompactableOplog, Flushable { + getParent().getInitFile() + "\". Drf did not contain a disk store id.", getParent()); } - } catch (EOFException ex) { + } catch (EOFException ignore) { // ignore since a partial record write can be caused by a crash - // if (byteCount < fileLength) { - // throw new - // DiskAccessException(LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0 - // .toLocalizedString(drfFile.getPath()), ex, getParent()); - // }// else do nothing, this is expected in crash scenarios } catch (IOException ex) { getParent().getCancelCriterion().checkCancelInProgress(ex); throw new DiskAccessException( LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0 .toLocalizedString(drfFile.getPath()), ex, getParent()); - } catch (CancelException ignore) { + } catch (CancelException e) { if (logger.isDebugEnabled()) { - logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", ignore); + logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", e); } - } catch (RegionDestroyedException ignore) { + } catch (RegionDestroyedException e) { if (logger.isDebugEnabled()) { - logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", ignore); + logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", e); } - } catch (IllegalStateException ex) { - // @todo - // if (!rgn.isClosed()) { - throw ex; - // } + } catch (IllegalStateException e) { + throw e; } // Add the Oplog size to the Directory Holder which owns this oplog, // so that available space is correctly calculated & stats updated. @@ -1711,7 +1681,7 @@ public final class Oplog implements CompactableOplog, Flushable { FileInputStream fis; try { fis = new FileInputStream(f); - } catch (FileNotFoundException ex) { + } catch (FileNotFoundException ignore) { return false; } try { @@ -1735,7 +1705,7 @@ public final class Oplog implements CompactableOplog, Flushable { validateOpcode(dis, OPLOG_DISK_STORE_ID); readDiskStoreRecord(dis, f); - } catch (DiskAccessException notInNewFormatErr) { + } catch (DiskAccessException ignore) { // Failed to read the file. There are two possibilities. Either this // file is in old format which does not have a magic seq in the // beginning or this is not a valid file at all. Try reading it as a @@ -1744,7 +1714,7 @@ public final class Oplog implements CompactableOplog, Flushable { fis = new FileInputStream(f); dis = new DataInputStream(new BufferedInputStream(fis, 1024 * 1024)); readDiskStoreRecord(dis, f); - } catch (IllegalStateException notOldFileErr) { + } catch (IllegalStateException ignore) { // Failed to read the file. There are two possibilities. Either this // is in new format which has a magic seq in the beginning or this is // not a valid file at all @@ -2023,32 +1993,24 @@ public final class Oplog implements CompactableOplog, Flushable { + getParent().getInitFile() + "\". Crf did not contain a disk store id.", getParent()); } - } catch (EOFException ex) { + } catch (EOFException ignore) { // ignore since a partial record write can be caused by a crash - // if (byteCount < fileLength) { - // throw new - // DiskAccessException(LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0 - // .toLocalizedString(this.crf.f.getPath()), ex, getParent()); - // }// else do nothing, this is expected in crash scenarios } catch (IOException ex) { getParent().getCancelCriterion().checkCancelInProgress(ex); throw new DiskAccessException( LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0 .toLocalizedString(this.crf.f.getPath()), ex, getParent()); - } catch (CancelException ignore) { + } catch (CancelException e) { if (logger.isDebugEnabled()) { - logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", ignore); + logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", e); } - } catch (RegionDestroyedException ignore) { + } catch (RegionDestroyedException e) { if (logger.isDebugEnabled()) { - logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", ignore); + logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", e); } - } catch (IllegalStateException ex) { - // @todo - // if (!rgn.isClosed()) { - throw ex; - // } + } catch (IllegalStateException e) { + throw e; } // Add the Oplog size to the Directory Holder which owns this oplog, @@ -2109,7 +2071,7 @@ public final class Oplog implements CompactableOplog, Flushable { if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY)) { StringBuffer sb = new StringBuffer(); for (int i = 0; i < OPLOG_TYPE.getLen(); i++) { - sb.append(" " + seq[i]); + sb.append(" ").append(seq[i]); } logger.trace(LogMarker.PERSIST_RECOVERY, "oplog magic code: {}", sb); } @@ -2222,7 +2184,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } } else { - boolean rvvTrusted = InternalDataSerializer.readBoolean(dis); + boolean rvvTrusted = DataSerializer.readBoolean(dis); if (drs != null) { if (latestOplog) { // only set rvvtrust based on the newest oplog recovered @@ -2491,8 +2453,6 @@ public final class Oplog implements CompactableOplog, Flushable { * * @param dis DataInputStream from which the oplog is being read * @param opcode byte whether the id is short/int/long - * @param recoverValue - * @throws IOException */ private void readNewEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds, boolean recoverValue, final LocalRegion currentRegion, Version version, ByteArrayDataInput in, @@ -2679,10 +2639,6 @@ public final class Oplog implements CompactableOplog, Flushable { * * @param dis DataInputStream from which the oplog is being read * @param opcode byte whether the id is short/int/long - * @param recoverValue - * @param currentRegion - * @param keyRequiresRegionContext - * @throws IOException */ private void readModifyEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds, boolean recoverValue, LocalRegion currentRegion, Version version, ByteArrayDataInput in, @@ -2891,7 +2847,7 @@ public final class Oplog implements CompactableOplog, Flushable { DiskEntry.Helper.readSerializedValue(valueBytes, version, in, true); } catch (SerializationException ex) { if (logger.isDebugEnabled()) { - logger.debug("Could not deserialize recovered value: {}" + ex.getCause(), ex); + logger.debug("Could not deserialize recovered value: {}", ex.getCause(), ex); } } } @@ -2904,9 +2860,6 @@ public final class Oplog implements CompactableOplog, Flushable { * * @param dis DataInputStream from which the oplog is being read * @param opcode byte whether the id is short/int/long - * @param deletedIds - * @param recoverValue - * @throws IOException */ private void readModifyEntryWithKey(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds, boolean recoverValue, final LocalRegion currentRegion, @@ -3099,12 +3052,9 @@ public final class Oplog implements CompactableOplog, Flushable { * @param dis DataInputStream from which the oplog is being read * @param opcode byte whether the id is short/int/long * @param parent instance of disk region - * @throws IOException */ private void readDelEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds, - DiskStoreImpl parent) throws IOException - - { + DiskStoreImpl parent) throws IOException { int idByteCount = (opcode - OPLOG_DEL_ENTRY_1ID) + 1; // long debugRecoverDelEntryId = this.recoverDelEntryId; long oplogKeyId = getDelEntryId(dis, idByteCount); @@ -3161,8 +3111,6 @@ public final class Oplog implements CompactableOplog, Flushable { * Returns true if it is ok the skip the current modify record which had the given oplogEntryId. * It is ok to skip if any of the following are true: 1. deletedIds contains the id 2. the last * modification of the entry was done by a record read from an oplog other than this oplog - * - * @param tag */ private OkToSkipResult okToSkipModifyRecord(OplogEntryIdSet deletedIds, long drId, DiskRecoveryStore drs, long oplogEntryId, boolean checkRecoveryMap, VersionTag tag) { @@ -3230,8 +3178,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * Returns true if the drId region has been destroyed or if oplogKeyId preceeds the last clear * done on the drId region - * - * @param tag */ private OkToSkipResult okToSkipRegion(DiskRegionView drv, long oplogKeyId, VersionTag tag) { long lastClearKeyId = drv.getClearOplogEntryId(); @@ -3300,25 +3246,16 @@ public final class Oplog implements CompactableOplog, Flushable { assert idByteCount >= 1 && idByteCount <= 8 : idByteCount; long delta; - byte firstByte = dis.readByte(); - // if (firstByte < 0) { - // delta = 0xFFFFFFFFFFFFFF00L | firstByte; - // } else { - // delta = firstByte; - // } - delta = firstByte; + delta = dis.readByte(); idByteCount--; while (idByteCount > 0) { delta <<= 8; delta |= (0x00FF & dis.readByte()); idByteCount--; } - // this.lastDelta = delta; // HACK DEBUG return delta; } - // private long lastDelta; // HACK DEBUG - /** * Call this when the cache is closed or region is destroyed. Deletes the lock files. */ @@ -3501,8 +3438,6 @@ public final class Oplog implements CompactableOplog, Flushable { * @param opCode The int value identifying whether it is create/modify or delete operation * @param entry The DiskEntry object being operated upon * @param value The byte array representing the value - * @param userBits - * @throws IOException */ private void initOpState(byte opCode, DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits, boolean notToUseUserBits) throws IOException { @@ -3545,17 +3480,14 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: Modified the code so as to reuse the already created ByteBuffer during transition. - * Creates a key/value pair from a region entry on disk. Updates all of the necessary + * Modified the code so as to reuse the already created ByteBuffer during transition. Creates a + * key/value pair from a region entry on disk. Updates all of the necessary * {@linkplain DiskStoreStats statistics} and invokes basicCreate * * @param entry The DiskEntry object for this key/value pair. * @param value byte array representing the value - * @throws DiskAccessException - * @throws IllegalStateException - * */ - public final void create(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) { + public void create(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) { if (this != getOplogSet().getChild()) { getOplogSet().getChild().create(region, entry, value, async); @@ -3612,13 +3544,11 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: A helper function which identifies whether to create the entry in the current oplog or to - * make the switch to the next oplog. This function enables us to reuse the byte buffer which got + * A helper function which identifies whether to create the entry in the current oplog or to make + * the switch to the next oplog. This function enables us to reuse the byte buffer which got * created for an oplog which no longer permits us to use itself * * @param entry DiskEntry object representing the current Entry - * @throws IOException - * @throws InterruptedException */ private void basicCreate(DiskRegion dr, DiskEntry entry, ValueWrapper value, byte userBits, boolean async) throws IOException, InterruptedException { @@ -3634,7 +3564,7 @@ public final class Oplog implements CompactableOplog, Flushable { // contention point // synchronized (this.crf) { initOpState(OPLOG_NEW_ENTRY_0ID, dr, entry, value, userBits, false); - // Asif : Check if the current data in ByteBuffer will cause a + // Check if the current data in ByteBuffer will cause a // potential increase in the size greater than the max allowed long temp = (getOpStateSize() + this.crf.currSize); if (!this.wroteNewEntryBase) { @@ -3662,10 +3592,10 @@ public final class Oplog implements CompactableOplog, Flushable { id.setKeyId(createOplogEntryId); // startPosForSynchOp = this.crf.currSize; - // Asif: Allow it to be added to the OpLOg so increase the + // Allow it to be added to the OpLOg so increase the // size of currenstartPosForSynchOpt oplog int dataLength = getOpStateSize(); - // Asif: It is necessary that we set the + // It is necessary that we set the // Oplog ID here without releasing the lock on object as we are // writing to the file after releasing the lock. This can cause // a situation where the @@ -3705,7 +3635,7 @@ public final class Oplog implements CompactableOplog, Flushable { if (logger.isTraceEnabled()) { logger.trace("Oplog::basicCreate:Release dByteBuffer with data for Disk ID = {}", id); } - // Asif: As such for any put or get operation , a synch is taken + // As such for any put or get operation , a synch is taken // on the Entry object in the DiskEntry's Helper functions. // Compactor thread will also take a lock on entry object. Therefore // we do not require a lock on DiskID, as concurrent access for @@ -3767,15 +3697,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * This oplog will be forced to switch to a new oplog - * - * - * public void forceRolling() { if (getOplogSet().getChild() == this) { synchronized (this.lock) { - * if (getOplogSet().getChild() == this) { switchOpLog(0, null); } } if (!this.sync) { - * this.writer.activateThreadToTerminate(); } } } - */ - - /** - * This oplog will be forced to switch to a new oplog */ void forceRolling(DiskRegion dr) { if (getOplogSet().getChild() == this) { @@ -3798,11 +3719,11 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: This function is used to switch from one op Log to another , when the size of the current - * oplog has reached the maximum permissible. It is always called from synch block with lock - * object being the OpLog File object We will reuse the ByteBuffer Pool. We should add the current - * Oplog for compaction first & then try to get next directory holder as in case there is only a - * single directory with space being full, compaction has to happen before it can be given a new + * This function is used to switch from one op Log to another , when the size of the current oplog + * has reached the maximum permissible. It is always called from synch block with lock object + * being the OpLog File object We will reuse the ByteBuffer Pool. We should add the current Oplog + * for compaction first & then try to get next directory holder as in case there is only a single + * directory with space being full, compaction has to happen before it can be given a new * directory. If the operation causing the switching is on an Entry which already is referencing * the oplog to be compacted, then the compactor thread will skip compaction that entry & the * switching thread will roll the entry explicitly. @@ -3905,7 +3826,7 @@ public final class Oplog implements CompactableOplog, Flushable { createKrfAsync(); } } catch (DiskAccessException dae) { - // Asif: Remove the Oplog which was added in the DiskStoreImpl + // Remove the Oplog which was added in the DiskStoreImpl // for compaction as compaction cannot be done. // However, it is also possible that compactor // may have done the compaction of the Oplog but the switching thread @@ -3919,7 +3840,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * Schedule a task to create a krf asynchronously - * */ protected void createKrfAsync() { getParent().executeDiskStoreTask(new Runnable() { @@ -4313,10 +4233,10 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif:This function retrieves the value for an entry being compacted subject to entry - * referencing the oplog being compacted. Attempt is made to retrieve the value from in memory , - * if available, else from asynch buffers ( if asynch mode is enabled), else from the Oplog being - * compacted. It is invoked from switchOplog as well as OplogCompactor's compact function. + * This function retrieves the value for an entry being compacted subject to entry referencing the + * oplog being compacted. Attempt is made to retrieve the value from in memory , if available, + * else from asynch buffers ( if asynch mode is enabled), else from the Oplog being compacted. It + * is invoked from switchOplog as well as OplogCompactor's compact function. * * @param entry DiskEntry being compacted referencing the Oplog being compacted * @param wrapper Object of type BytesAndBitsForCompactor. The data if found is set in the wrapper @@ -4335,10 +4255,9 @@ public final class Oplog implements CompactableOplog, Flushable { @Released Object value = entry._getValueRetain(dr, true); ReferenceCountHelper.unskipRefCountTracking(); - // TODO:KIRK:OK Object value = entry.getValueWithContext(dr); boolean foundData = false; if (value == null) { - // Asif: If the mode is synch it is guaranteed to be present in the disk + // If the mode is synch it is guaranteed to be present in the disk foundData = basicGetForCompactor(dr, oplogOffset, false, did.getValueLength(), did.getUserBits(), wrapper); // after we have done the get do one more check to see if the @@ -4423,7 +4342,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } else if (value instanceof byte[]) { byte[] valueBytes = (byte[]) value; - // Asif: If the value is already a byte array then the user bit + // If the value is already a byte array then the user bit // is 0, which is the default value of the userBits variable, // indicating that it is non serialized data. Thus it is // to be used as it is & not to be deserialized to @@ -4469,20 +4388,16 @@ public final class Oplog implements CompactableOplog, Flushable { /** * Modifies a key/value pair from a region entry on disk. Updates all of the necessary * {@linkplain DiskStoreStats statistics} and invokes basicModify + * <p> + * Modified the code so as to reuse the already created ByteBuffer during transition. Minimizing + * the synchronization allowing multiple put operations for different entries to proceed + * concurrently for asynch mode * * @param entry DiskEntry object representing the current Entry * * @param value byte array representing the value - * @throws DiskAccessException - * @throws IllegalStateException - */ - /* - * Asif: Modified the code so as to reuse the already created ByteBuffer during transition. - * Minimizing the synchronization allowing multiple put operations for different entries to - * proceed concurrently for asynch mode */ - public final void modify(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) { - + public void modify(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) { if (getOplogSet().getChild() != this) { getOplogSet().getChild().modify(region, entry, value, async); } else { @@ -4559,10 +4474,9 @@ public final class Oplog implements CompactableOplog, Flushable { .toLocalizedString(this.diskFile.getPath()), ie, drv.getName()); } - } - public final void saveConflictVersionTag(LocalRegion region, VersionTag tag, boolean async) { + public void saveConflictVersionTag(LocalRegion region, VersionTag tag, boolean async) { if (getOplogSet().getChild() != this) { getOplogSet().getChild().saveConflictVersionTag(region, tag, async); } else { @@ -4581,8 +4495,8 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private final void copyForwardForOfflineCompact(long oplogKeyId, byte[] keyBytes, - byte[] valueBytes, byte userBits, long drId, VersionTag tag) { + private void copyForwardForOfflineCompact(long oplogKeyId, byte[] keyBytes, byte[] valueBytes, + byte userBits, long drId, VersionTag tag) { try { basicCopyForwardForOfflineCompact(oplogKeyId, keyBytes, valueBytes, userBits, drId, tag); } catch (IOException ex) { @@ -4600,7 +4514,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private final void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry, + private void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry, BytesAndBitsForCompactor wrapper) { if (getOplogSet().getChild() != this) { getOplogSet().getChild().copyForwardModifyForCompact(dr, entry, wrapper); @@ -4646,14 +4560,12 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: A helper function which identifies whether to modify the entry in the current oplog or to - * make the switch to the next oplog. This function enables us to reuse the byte buffer which got + * A helper function which identifies whether to modify the entry in the current oplog or to make + * the switch to the next oplog. This function enables us to reuse the byte buffer which got * created for an oplog which no longer permits us to use itself. It will also take acre of * compaction if required * * @param entry DiskEntry object representing the current Entry - * @throws IOException - * @throws InterruptedException */ private void basicModify(DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits, boolean async, boolean calledByCompactor) throws IOException, InterruptedException { @@ -4813,7 +4725,7 @@ public final class Oplog implements CompactableOplog, Flushable { this.crf.currSize = temp; if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { logger.trace(LogMarker.PERSIST_WRITES, - "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#", dr.getId(), tag, + "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#{}", dr.getId(), tag, getOplogId()); } this.dirHolder.incrementTotalOplogSize(adjustment); @@ -4870,7 +4782,7 @@ public final class Oplog implements CompactableOplog, Flushable { logger.trace(LogMarker.PERSIST_WRITES, "basicCopyForwardForOfflineCompact: id=<{}> keyBytes=<{}> valueOffset={} userBits={} valueLen={} valueBytes=<{}> drId={} oplog#{}", oplogKeyId, baToString(keyBytes), startPosForSynchOp, userBits, valueBytes.length, - baToString(valueBytes), getOplogId()); + baToString(valueBytes), drId, getOplogId()); } this.dirHolder.incrementTotalOplogSize(adjustment); @@ -4878,7 +4790,6 @@ public final class Oplog implements CompactableOplog, Flushable { } clearOpState(); } - // } } if (useNextOplog) { if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { @@ -4958,7 +4869,7 @@ public final class Oplog implements CompactableOplog, Flushable { * * @param entry DiskEntry object on which remove operation is called */ - public final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) { + public void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) { DiskRegion dr = region.getDiskRegion(); if (getOplogSet().getChild() != this) { getOplogSet().getChild().remove(region, entry, async, isClear); @@ -4987,16 +4898,14 @@ public final class Oplog implements CompactableOplog, Flushable { did.setValueLength(len); did.setUserBits(prevUsrBit); } - } - } } /** * Write the GC RVV for a single region to disk */ - public final void writeGCRVV(DiskRegion dr) { + public void writeGCRVV(DiskRegion dr) { boolean useNextOplog = false; synchronized (this.lock) { if (getOplogSet().getChild() != this) { @@ -5081,15 +4990,12 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * - * Asif: A helper function which identifies whether to record a removal of entry in the current - * oplog or to make the switch to the next oplog. This function enables us to reuse the byte - * buffer which got created for an oplog which no longer permits us to use itself. It will also - * take acre of compaction if required + * A helper function which identifies whether to record a removal of entry in the current oplog or + * to make the switch to the next oplog. This function enables us to reuse the byte buffer which + * got created for an oplog which no longer permits us to use itself. It will also take acre of + * compaction if required * * @param entry DiskEntry object representing the current Entry - * @throws IOException - * @throws InterruptedException */ private void basicRemove(DiskRegionView dr, DiskEntry entry, boolean async, boolean isClear) throws IOException, InterruptedException { @@ -5133,7 +5039,7 @@ public final class Oplog implements CompactableOplog, Flushable { } // Write the data to the opLog for the synch mode - // @todo if we don't sync write destroys what will happen if + // TODO: if we don't sync write destroys what will happen if // we do 1. create k1 2. destroy k1 3. create k1? // It would be possible for the crf to be flushed but not the drf. // Then during recovery we will find identical keys with different @@ -5145,13 +5051,11 @@ public final class Oplog implements CompactableOplog, Flushable { // because we might be killed right after we do this write. startPosForSynchOp = writeOpLogBytes(this.drf, async, true); setHasDeletes(true); - if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) { + if (logger.isDebugEnabled(LogMarker.PERSIST_WRITES)) { logger.debug("basicRemove: id=<{}> key=<{}> drId={} oplog#{}", abs(id.getKeyId()), entry.getKey(), dr.getId(), getOplogId()); } - // new RuntimeException("STACK")); - if (logger.isTraceEnabled()) { logger.trace("Oplog::basicRemove:Released ByteBuffer for Disk ID = {}", id); } @@ -5161,17 +5065,15 @@ public final class Oplog implements CompactableOplog, Flushable { id.setOffsetInOplog(-1); EntryLogger.logPersistDestroy(dr.getName(), entry.getKey(), dr.getDiskStoreID()); - { - Oplog rmOplog = null; - if (oldOplogId == getOplogId()) { - rmOplog = this; - } else { - rmOplog = getOplogSet().getChild(oldOplogId); - } - if (rmOplog != null) { - rmOplog.rmLive(dr, entry); - emptyOplog = rmOplog; - } + Oplog rmOplog = null; + if (oldOplogId == getOplogId()) { + rmOplog = this; + } else { + rmOplog = getOplogSet().getChild(oldOplogId); + } + if (rmOplog != null) { + rmOplog.rmLive(dr, entry); + emptyOplog = rmOplog; } clearOpState(); } @@ -5193,21 +5095,15 @@ public final class Oplog implements CompactableOplog, Flushable { } } - // /** - // * This is only used for an assertion check. - // */ - // private long lastWritePos = -1; - /** * test hook */ - public final ByteBuffer getWriteBuf() { + ByteBuffer getWriteBuf() { return this.crf.writeBuf; } - private final void flushNoSync(OplogFile olf) throws IOException { + private void flushNoSync(OplogFile olf) throws IOException { flushAllNoSync(false); // @todo - // flush(olf, false); } @Override @@ -5226,14 +5122,13 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private final void flushAndSync(OplogFile olf) throws IOException { - flushAll(false); // @todo - // flush(olf, true); + private void flushAndSync(OplogFile olf) throws IOException { + flushAll(false); } private static final int MAX_CHANNEL_RETRIES = 5; - private final void flush(OplogFile olf, boolean doSync) throws IOException { + private void flush(OplogFile olf, boolean doSync) throws IOException { try { synchronized (this.lock/* olf */) { if (olf.RAFClosed) { @@ -5290,7 +5185,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException { + private void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException { try { synchronized (this.lock/* olf */) { if (olf.RAFClosed) { @@ -5318,24 +5213,22 @@ public final class Oplog implements CompactableOplog, Flushable { } } - public final void flushAll() { + public void flushAll() { flushAll(false); } - public final void flushAllNoSync(boolean skipDrf) { + public void flushAllNoSync(boolean skipDrf) { flushAll(skipDrf, false); } - public final void flushAll(boolean skipDrf) { + public void flushAll(boolean skipDrf) { flushAll(skipDrf, true/* doSync */); } - public final void flushAll(boolean skipDrf, boolean doSync) { + public void flushAll(boolean skipDrf, boolean doSync) { try { - // if (!skipDrf) { - // @todo if skipDrf then only need to do drf if crf has flushable data + // TODO: if skipDrf then only need to do drf if crf has flushable data flush(this.drf, doSync); - // } flush(this.crf, doSync); } catch (IOException ex) { getParent().getCancelCriterion().checkCancelInProgress(ex); @@ -5346,13 +5239,13 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: Since the ByteBuffer being writen to can have additional bytes which are used for - * extending the size of the file, it is necessary that the ByteBuffer provided should have limit - * which is set to the position till which it contains the actual bytes. If the mode is synched - * write then only we will write up to the capacity & opLogSpace variable have any meaning. For - * asynch mode it will be zero. Also this method must be synchronized on the file , whether we use - * synch or asynch write because the fault in operations can clash with the asynch writing. Write - * the specified bytes to the oplog. Note that since extending a file is expensive this code will + * Since the ByteBuffer being writen to can have additional bytes which are used for extending the + * size of the file, it is necessary that the ByteBuffer provided should have limit which is set + * to the position till which it contains the actual bytes. If the mode is synched write then only + * we will write up to the capacity & opLogSpace variable have any meaning. For asynch mode it + * will be zero. Also this method must be synchronized on the file , whether we use synch or + * asynch write because the fault in operations can clash with the asynch writing. Write the + * specified bytes to the oplog. Note that since extending a file is expensive this code will * possibly write OPLOG_EXTEND_SIZE zero bytes to reduce the number of times the file is extended. * * @@ -5368,7 +5261,7 @@ public final class Oplog implements CompactableOplog, Flushable { Assert.assertTrue(false, "The Oplog " + this.oplogId + " for store " + getParent().getName() + " has been closed for synch mode while writing is going on. This should not happen"); } - // Asif : It is assumed that the file pointer is already at the + // It is assumed that the file pointer is already at the // appropriate position in the file so as to allow writing at the end. // Any fault in operations will set the pointer back to the write // location. @@ -5457,10 +5350,9 @@ public final class Oplog implements CompactableOplog, Flushable { // + " oplog #" + getOplogId(), this.owner); // } this.beingRead = true; - final long readPosition = offsetInOplog; if (/* * !getParent().isSync() since compactor groups writes && - */(readPosition + valueLength) > this.crf.bytesFlushed && !this.closed) { + */(offsetInOplog + valueLength) > this.crf.bytesFlushed && !this.closed) { flushAllNoSync(true); // fix for bug 41205 } try { @@ -5482,20 +5374,19 @@ public final class Oplog implements CompactableOplog, Flushable { try { final long writePosition = (this.doneAppending) ? this.crf.bytesFlushed : myRAF.getFilePointer(); - if ((readPosition + valueLength) > writePosition) { + if ((offsetInOplog + valueLength) > writePosition) { throw new DiskAccessException( LocalizedStrings.Oplog_TRIED_TO_SEEK_TO_0_BUT_THE_FILE_LENGTH_IS_1_OPLOG_FILE_OBJECT_USED_FOR_READING_2 - .toLocalizedString( - new Object[] {readPosition + valueLength, writePosition, this.crf.raf}), + .toLocalizedString(offsetInOplog + valueLength, writePosition, this.crf.raf), dr.getName()); - } else if (readPosition < 0) { + } else if (offsetInOplog < 0) { throw new DiskAccessException( - LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1.toLocalizedString( - new Object[] {offsetInOplog, this.diskFile.getPath()}), + LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1 + .toLocalizedString(offsetInOplog, this.diskFile.getPath()), dr.getName()); } try { - myRAF.seek(readPosition); + myRAF.seek(offsetInOplog); this.stats.incOplogSeeks(); byte[] valueBytes = new byte[valueLength]; myRAF.readFully(valueBytes); @@ -5543,7 +5434,7 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: Extracts the Value byte array & UserBit from the OpLog + * Extracts the Value byte array & UserBit from the OpLog * * @param offsetInOplog The starting position from which to read the data in the opLog * @param bitOnly boolean indicating whether the value needs to be extracted along with the @@ -5574,7 +5465,7 @@ public final class Oplog implements CompactableOplog, Flushable { try { bb = attemptGet(dr, offsetInOplog, bitOnly, valueLength, userBits); break; - } catch (InterruptedIOException e) { // bug 39756 + } catch (InterruptedIOException ignore) { // bug 39756 // ignore, we'll clear and retry. } finally { if (interrupted) { @@ -5586,10 +5477,8 @@ public final class Oplog implements CompactableOplog, Flushable { getParent().getCancelCriterion().checkCancelInProgress(ex); throw new DiskAccessException( LocalizedStrings.Oplog_FAILED_READING_FROM_0_OPLOGID_1_OFFSET_BEING_READ_2_CURRENT_OPLOG_SIZE_3_ACTUAL_FILE_SIZE_4_IS_ASYNCH_MODE_5_IS_ASYNCH_WRITER_ALIVE_6 - .toLocalizedString(new Object[] {this.diskFile.getPath(), - Long.valueOf(this.oplogId), Long.valueOf(offsetInOplog), - Long.valueOf(this.crf.currSize), Long.valueOf(this.crf.bytesFlushed), - Boolean.valueOf(!dr.isSync()), Boolean.valueOf(false)}), + .toLocalizedString(this.diskFile.getPath(), this.oplogId, offsetInOplog, + this.crf.currSize, this.crf.bytesFlushed, !dr.isSync(), Boolean.FALSE), ex, dr.getName()); } catch (IllegalStateException ex) { checkClosed(); @@ -5600,8 +5489,8 @@ public final class Oplog implements CompactableOplog, Flushable { } /** - * Asif: Extracts the Value byte array & UserBit from the OpLog and inserts it in the wrapper - * Object of type BytesAndBitsForCompactor which is passed + * Extracts the Value byte array & UserBit from the OpLog and inserts it in the wrapper Object of + * type BytesAndBitsForCompactor which is passed * * @param offsetInOplog The starting position from which to read the data in the opLog * @param bitOnly boolean indicating whether the value needs to be extracted along with the @@ -5635,10 +5524,9 @@ public final class Oplog implements CompactableOplog, Flushable { } else { try { synchronized (this.lock/* crf */) { - final long readPosition = offsetInOplog; if (/* * !getParent().isSync() since compactor groups writes && - */(readPosition + valueLength) > this.crf.bytesFlushed && !this.closed) { + */(offsetInOplog + valueLength) > this.crf.bytesFlushed && !this.closed) { flushAllNoSync(true); // fix for bug 41205 } if (!reopenFileIfClosed()) { @@ -5646,25 +5534,19 @@ public final class Oplog implements CompactableOplog, Flushable { } final long writePosition = (this.doneAppending) ? this.crf.bytesFlushed : this.crf.raf.getFilePointer(); - if ((readPosition + valueLength) > writePosition) { + if ((offsetInOplog + valueLength) > writePosition) { throw new DiskAccessException( LocalizedStrings.Oplog_TRIED_TO_SEEK_TO_0_BUT_THE_FILE_LENGTH_IS_1_OPLOG_FILE_OBJECT_USED_FOR_READING_2 - .toLocalizedString( - new Object[] {readPosition + valueLength, writePosition, this.crf.raf}), + .toLocalizedString(offsetInOplog + valueLength, writePosition, this.crf.raf), dr.getName()); - } else if (readPosition < 0) { + } else if (offsetInOplog < 0) { throw new DiskAccessException( - LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1.toLocalizedString( - new Object[] {Long.valueOf(offsetInOplog), this.diskFile.getPath()}), + LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1 + .toLocalizedString(offsetInOplog, this.diskFile.getPath()), dr.getName()); } - // if (this.closed || this.deleted.get()) { - // throw new DiskAccessException("attempting get on " - // + (this.deleted.get() ? "destroyed" : "closed") - // + " oplog #" + getOplogId(), this.owner); - // } try { - this.crf.raf.seek(readPosition); + this.crf.raf.seek(offsetInOplog); this.stats.incOplogSeeks(); byte[] valueBytes = null; if (wrapper.getBytes().length < valueLength) { @@ -5694,14 +5576,8 @@ public final class Oplog implements CompactableOplog, Flushable { getParent().getCancelCriterion().checkCancelInProgress(ex); throw new DiskAccessException( LocalizedStrings.Oplog_FAILED_READING_FROM_0_OPLOG_DETAILS_1_2_3_4_5_6 - .toLocalizedString(new Object[] {this.diskFile.getPath(), - Long.valueOf(this.oplogId), Long.valueOf(offsetInOplog), - Long.valueOf(this.crf.currSize), Long.valueOf(this.crf.bytesFlushed), - Boolean.valueOf(/* - * ! dr . isSync ( ) - * - * @ todo - */false), Boolean.valueOf(false)}), + .toLocalizedString(this.diskFile.getPath(), this.oplogId, offsetInOplog, + this.crf.currSize, this.crf.bytesFlushed, Boolean.FALSE, Boolean.FALSE), ex, dr.getName()); } catch (IllegalStateException ex) { @@ -5956,8 +5832,7 @@ public final class Oplog implements CompactableOplog, Flushable { tlc = 0; } double rv = tlc; - double rvHWM = rvHWMtmp; - if (((rv / rvHWM) * 100) <= parent.getCompactionThreshold()) { + if (((rv / (double) rvHWMtmp) * 100) <= parent.getCompactionThreshold()) { return true; } } else { @@ -6058,7 +5933,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private GemFireCacheImpl getGemFireCache() { + private InternalCache getInternalCache() { return getParent().getCache(); } @@ -6136,7 +6011,7 @@ public final class Oplog implements CompactableOplog, Flushable { return 0; // do this while holding compactorLock } - // Asif:Start with a fresh wrapper on every compaction so that + // Start with a fresh wrapper on every compaction so that // if previous run used some high memory byte array which was // exceptional, it gets garbage collected. long opStart = getStats().getStatTime(); @@ -6199,7 +6074,7 @@ public final class Oplog implements CompactableOplog, Flushable { totalCount++; getStats().endCompactionUpdate(opStart); opStart = getStats().getStatTime(); - // Asif: Check if the value byte array happens to be any of the + // Check if the value byte array happens to be any of the // constant // static byte arrays or references the value byte array of // underlying RegionEntry. @@ -6259,8 +6134,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * This method is called by the async value recovery task to recover the values from the crf if * the keys were recovered from the krf. - * - * @param diskRecoveryStores */ public void recoverValuesIfNeeded(Map<Long, DiskRecoveryStore> diskRecoveryStores) { // Early out if we start closing the parent. @@ -6359,7 +6232,7 @@ public final class Oplog implements CompactableOplog, Flushable { try { DiskEntry.Helper.recoverValue(diskEntry, getOplogId(), diskRecoveryStore, in); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // This region has been destroyed, stop recovering from it. diskRecoveryStores.remove(diskRegionId); } @@ -6417,7 +6290,7 @@ public final class Oplog implements CompactableOplog, Flushable { InternalDataSerializer.writeUnsignedVL(gcVersion, out); } } else { - InternalDataSerializer.writeBoolean(dr.getRVVTrusted(), out); + DataSerializer.writeBoolean(dr.getRVVTrusted(), out); // Otherwise, we will write the version and exception list for each // member Map<VersionSource, RegionVersionHolder> memberToVersion = rvv.getMemberToVersion(); @@ -6437,27 +6310,12 @@ public final class Oplog implements CompactableOplog, Flushable { } } } - byte[] rvvBytes = out.toByteArray(); - return rvvBytes; + return out.toByteArray(); } - // // Comparable code // - // public int compareTo(Oplog o) { - // return getOplogId() - o.getOplogId(); - // } - // public boolean equals(Object o) { - // if (o instanceof Oplog) { - // return compareTo((Oplog)o) == 0; - // } else { - // return false; - // } - // } - // public int hashCode() { - // return getOplogId(); - // } @Override public String toString() { - return "oplog#" + getOplogId() /* + "DEBUG" + System.identityHashCode(this) */; + return "oplog#" + getOplogId(); } /** @@ -6472,10 +6330,6 @@ public final class Oplog implements CompactableOplog, Flushable { return chPrev; } - // //////// Methods used during recovery ////////////// - - // ////////////////////Inner Classes ////////////////////// - private static class OplogFile { public File f; public UninterruptibleRandomAccessFile raf; @@ -6501,25 +6355,16 @@ public final class Oplog implements CompactableOplog, Flushable { } private static String baToString(byte[] ba, int len) { - if (ba == null) + if (ba == null) { return "null"; - StringBuffer sb = new StringBuffer(); + } + StringBuilder sb = new StringBuilder(); for (int i = 0; i < len; i++) { sb.append(ba[i]).append(", "); } return sb.toString(); } - private static String laToString(long[] la) { - if (la == null) - return "null"; - StringBuffer sb = new StringBuffer(); - for (int i = 0; i < la.length; i++) { - sb.append(la[i]).append(", "); - } - return sb.toString(); - } - void serializeVersionTag(VersionHolder tag, DataOutput out) throws IOException { int entryVersion = tag.getEntryVersion(); long regionVersion = tag.getRegionVersion(); @@ -6551,8 +6396,7 @@ public final class Oplog implements CompactableOplog, Flushable { VersionSource versionMember, long timestamp, int dsId) throws IOException { HeapDataOutputStream out = new HeapDataOutputStream(4 + 8 + 4 + 8 + 4, Version.CURRENT); serializeVersionTag(entryVersion, regionVersion, versionMember, timestamp, dsId, out); - byte[] versionsBytes = out.toByteArray(); - return versionsBytes; + return out.toByteArray(); } private void serializeVersionTag(int entryVersion, long regionVersion, @@ -6592,11 +6436,7 @@ public final class Oplog implements CompactableOplog, Flushable { private byte[] versionsBytes; private short gfversion; - // private int entryVersion; - // private long regionVersion; - // private int memberId; // canonicalId of memberID - - public final int getSize() { + public int getSize() { return this.size; } @@ -6607,17 +6447,16 @@ public final class Oplog implements CompactableOplog, Flushable { return sb.toString(); } - private final void write(OplogFile olf, ValueWrapper vw) throws IOException { + private void write(OplogFile olf, ValueWrapper vw) throws IOException { vw.sendTo(olf.writeBuf, Oplog.this); } - private final void write(OplogFile olf, byte[] bytes, int byteLength) throws IOException { + private void write(OplogFile olf, byte[] bytes, int byteLength) throws IOException { int offset = 0; - final int maxOffset = byteLength; ByteBuffer bb = olf.writeBuf; - while (offset < maxOffset) { + while (offset < byteLength) { - int bytesThisTime = maxOffset - offset; + int bytesThisTime = byteLength - offset; boolean needsFlush = false; if (bytesThisTime > bb.remaining()) { needsFlush = true; @@ -6631,7 +6470,7 @@ public final class Oplog implements CompactableOplog, Flushable { } } - private final void writeByte(OplogFile olf, byte v) throws IOException { + private void writeByte(OplogFile olf, byte v) throws IOException { ByteBuffer bb = olf.writeBuf; if (1 > bb.remaining()) { flushNoSync(olf); @@ -6639,7 +6478,7 @@ public final class Oplog implements CompactableOplog, Flushable { bb.put(v); } - private final void writeOrdinal(OplogFile olf, short ordinal) throws IOException { + private void writeOrdinal(OplogFile olf, short ordinal) throws IOException { ByteBuffer bb = olf.writeBuf; if (3 > bb.remaining()) { flushNoSync(olf); @@ -6648,7 +6487,7 @@ public final class Oplog implements CompactableOplog, Flushable { Version.writeOrdinal(bb, ordinal, false); } - private final void writeInt(OplogFile olf, int v) throws IOException { + private void writeInt(OplogFile olf, int v) throws IOException { ByteBuffer bb = olf.writeBuf; if (4 > bb.remaining()) { flushNoSync(olf); @@ -6656,7 +6495,7 @@ public final class Oplog implements CompactableOplog, Flushable { bb.putInt(v); } - private final void writeLong(OplogFile olf, long v) throws IOException { + private void writeLong(OplogFile olf, long v) throws IOException { ByteBuffer bb = olf.writeBuf; if (8 > bb.remaining()) { flushNoSync(olf); @@ -7001,9 +6840,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * returns the number of entries cleared - * - * @param rvv - * @param pendingKrfTags */ public synchronized int clear(RegionVersionVector rvv, Map<DiskEntry, VersionHolder> pendingKrfTags) { @@ -7023,8 +6859,6 @@ public final class Oplog implements CompactableOplog, Flushable { /** * Clear using an RVV. Remove live entries that are contained within the clear RVV. - * - * @param pendingKrfTags */ private int clearWithRVV(RegionVersionVector rvv, Map<DiskEntry, VersionTag> pendingKrfTags) { // TODO this doesn't work, because we can end up removing entries from @@ -7033,35 +6867,6 @@ public final class Oplog implements CompactableOplog, Flushable { // behavior // until I fix the region map code. return 0; - // int result = 0; - // DiskEntry n = getNext(); - // while (n != this) { - // DiskEntry nextEntry = n.getNext(); - // VersionSource member = null; - // long version = -1; - // if(pendingKrfTags != null) { - // VersionTag tag = pendingKrfTags.get(n); - // if(tag != null) { - // member = tag.getMemberID(); - // version = tag.getRegionVersion(); - // } - // } - // if(member == null) { - // VersionStamp stamp = n.getVersionStamp(); - // member = stamp.getMemberID(); - // version = stamp.getRegionVersion(); - // } - // - // if(rvv.contains(member, version)) { - // result++; - // remove(n); - // if(pendingKrfTags != null) { - // pendingKrfTags.remove(n); - // } - // } - // n = nextEntry; - // } - // return result; } /** @@ -7127,23 +6932,6 @@ public final class Oplog implements CompactableOplog, Flushable { } } - // private synchronized void checkForDuplicate(DiskEntry v) { - // DiskEntry de = getPrev(); - // final long newKeyId = v.getDiskId().getKeyId(); - // while (de != this) { - // if (de.getDiskId().getKeyId() == newKeyId) { - // throw new IllegalStateException( - // "DEBUG: found duplicate for oplogKeyId=" + newKeyId + " de=" - // + System.identityHashCode(v) + " ode=" - // + System.identityHashCode(de) + " deKey=" + v.getKey() - // + " odeKey=" + de.getKey() + " deOffset=" - // + v.getDiskId().getOffsetInOplog() + " odeOffset=" - // + de.getDiskId().getOffsetInOplog()); - // } - // de = de.getPrev(); - // } - // } - @Override public Object getKey() { throw new IllegalStateException(); @@ -7214,7 +7002,6 @@ public final class Oplog implements CompactableOplog, Flushable { * @param liveEntries the array to fill with the live entries * @param idx the first free slot in liveEntries * @param drv the disk region these entries are on - * @param pendingKrfTags * @return the next free slot in liveEntries */ public synchronized int addLiveEntriesToList(KRFEntry[] liveEntries, int idx, @@ -7597,11 +7384,11 @@ public final class Oplog implements CompactableOplog, Flushable { public abstract long clear(RegionVersionVector rvv); - final public DiskRegionView getDiskRegion() { + public DiskRegionView getDiskRegion() { return this.dr; } - final public void setDiskRegion(DiskRegionView dr) { + public void setDiskRegion(DiskRegionView dr) { this.dr = dr; } @@ -7614,11 +7401,11 @@ public final class Oplog implements CompactableOplog, Flushable { return result; } - final synchronized public boolean getUnrecovered() { + synchronized public boolean getUnrecovered() { return this.unrecovered; } - final synchronized public boolean testAndSetRecovered(DiskRegionView dr) { + synchronized public boolean testAndSetRecovered(DiskRegionView dr) { boolean result = this.unrecovered; if (result) { this.unrecovered = false; @@ -7773,9 +7560,8 @@ public final class Oplog implements CompactableOplog, Flushable { public int addLiveEntriesToList(KRFEntry[] liveEntries, int idx) { synchronized (liveEntries) { - int result = this.liveEntries.addLiveEntriesToList(liveEntries, idx, getDiskRegion(), + return this.liveEntries.addLiveEntriesToList(liveEntries, idx, getDiskRegion(), pendingKrfTags); - return result; } } @@ -7816,8 +7602,10 @@ public final class Oplog implements CompactableOplog, Flushable { * range. */ static class OplogEntryIdMap { + private final Int2ObjectOpenHashMap ints = new Int2ObjectOpenHashMap((int) DiskStoreImpl.INVALID_ID); + private final Long2ObjectOpenHashMap longs = new Long2ObjectOpenHashMap((int) DiskStoreImpl.INVALID_ID);