Repository: ignite Updated Branches: refs/heads/ignite-5322 [created] 3cd2cec57
IGNITE-5322 - WAL iterator improvements Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cd2cec5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cd2cec5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cd2cec5 Branch: refs/heads/ignite-5322 Commit: 3cd2cec57cfbdaee8551939aec05b7e173926f93 Parents: c6313b7 Author: Alexey Goncharuk <[email protected]> Authored: Tue May 30 12:09:01 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue May 30 12:09:01 2017 +0300 ---------------------------------------------------------------------- .../wal/record/StoreOperationRecord.java | 118 -------------- .../internal/pagemem/wal/record/WALRecord.java | 12 +- .../GridCacheDatabaseSharedManager.java | 6 +- .../cache/database/wal/FileWALPointer.java | 17 +- .../database/wal/FileWriteAheadLogManager.java | 130 ++++++++------- .../cache/database/wal/RecordSerializer.java | 3 +- .../wal/serializer/RecordV1Serializer.java | 159 ++++++++++--------- .../db/file/IgniteWalRecoverySelfTest.java | 2 +- 8 files changed, 182 insertions(+), 265 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java deleted file mode 100644 index a82f604..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/StoreOperationRecord.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.pagemem.wal.record; - -/** - * - */ -public class StoreOperationRecord extends WALRecord { - /** - * Store operation type. - */ - public enum StoreOperationType { - /** */ - ENTRY_CREATE, - - /** */ - INDEX_PUT, - - /** */ - INDEX_REMOVE; - - /** */ - private static final StoreOperationType[] VALS = StoreOperationType.values(); - - /** */ - public static StoreOperationType fromOrdinal(int ord) { - return ord < 0 || ord >= VALS.length ? null : VALS[ord]; - } - } - - /** */ - private StoreOperationType opType; - - /** */ - private int cacheId; - - /** */ - private long link; - - /** */ - private int idxId; - - /** {@inheritDoc} */ - @Override public RecordType type() { - return RecordType.STORE_OPERATION_RECORD; - } - - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** - * @return Link to data. - */ - public long link() { - return link; - } - - /** - * @return Index ID. - */ - public int indexId() { - return idxId; - } - - /** - * @return Operation type. - */ - public StoreOperationType operationType() { - return opType; - } - - /** - * @param opType Operation type. - */ - public void operationType(StoreOperationType opType) { - this.opType = opType; - } - - /** - * @param cacheId Cache ID. - */ - public void cacheId(int cacheId) { - this.cacheId = cacheId; - } - - /** - * @param link Link. - */ - public void link(long link) { - this.link = link; - } - - /** - * @param idxId Index ID. - */ - public void indexId(int idxId) { - this.idxId = idxId; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 142f0ee..b76bcc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.pagemem.wal.record; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -38,9 +39,6 @@ public abstract class WALRecord { DATA_RECORD, /** */ - STORE_OPERATION_RECORD, - - /** */ CHECKPOINT_RECORD, /** */ @@ -186,7 +184,7 @@ public abstract class WALRecord { private WALRecord prev; /** */ - private long pos; + private WALPointer pos; /** * @param chainSize Chain size in bytes. @@ -219,15 +217,15 @@ public abstract class WALRecord { /** * @return Position in file. */ - public long position() { + public WALPointer position() { return pos; } /** * @param pos Position in file. */ - public void position(long pos) { - assert pos >= 0: pos; + public void position(WALPointer pos) { + assert pos != null; this.pos = pos; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index a78ba27..c57f9cb 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -1232,7 +1232,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } - ByteBuffer buf = ByteBuffer.allocate(16); + ByteBuffer buf = ByteBuffer.allocate(20); buf.order(ByteOrder.nativeOrder()); if (startFile != null) @@ -1260,7 +1260,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan buf.flip(); - return new FileWALPointer(buf.getInt(), buf.getInt(), buf.getInt()); + return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt()); } catch (IOException e) { throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " + @@ -1728,7 +1728,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tmpWriteBuf.rewind(); - tmpWriteBuf.putInt(filePtr.index()); + tmpWriteBuf.putLong(filePtr.index()); tmpWriteBuf.putInt(filePtr.fileOffset()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java index 1102054..36df2e7 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWALPointer.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; */ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { /** */ - private final int idx; + private final long idx; /** */ private final int fileOffset; @@ -37,7 +37,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { * @param idx File timestamp index. * @param fileOffset Offset in file, from the beginning. */ - public FileWALPointer(int idx, int fileOffset, int len) { + public FileWALPointer(long idx, int fileOffset, int len) { this.idx = idx; this.fileOffset = fileOffset; this.len = len; @@ -46,7 +46,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { /** * @return Timestamp index. */ - public int index() { + public long index() { return idx; } @@ -64,6 +64,13 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { return len; } + /** + * @param len Record length. + */ + public void length(int len) { + this.len = len; + } + /** {@inheritDoc} */ @Override public WALPointer next() { if (len == 0) @@ -89,7 +96,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { /** {@inheritDoc} */ @Override public int hashCode() { - int result = idx; + int result = (int)(idx ^ (idx >>> 32)); result = 31 * result + fileOffset; @@ -98,7 +105,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { /** {@inheritDoc} */ @Override public int compareTo(FileWALPointer o) { - int res = Integer.compare(idx, o.idx); + int res = Long.compare(idx, o.idx); return res == 0 ? Integer.compare(fileOffset, o.fileOffset) : res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java index f8b18ef..4b79308 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java @@ -192,7 +192,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert dbCfg != null : "WAL should not be created if persistence is disabled."; this.dbCfg = dbCfg; - this.igCfg = igCfg; maxWalSegmentSize = dbCfg.getWalSegmentSize(); @@ -443,7 +442,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl archiver0.release(((FileWALPointer)start).index()); } - private boolean hasIndex(int absIdx) { + private boolean hasIndex(long absIdx) { String name = FileDescriptor.fileName(absIdx, serializer.version()); boolean inArchive = new File(walArchiveDir, name).exists(); @@ -509,7 +508,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param consId Local node consistent ID. * @param msg File description to print out on successful initialization. * @return Initialized directory. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed to initialize directory. */ private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException { File dir; @@ -568,11 +567,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IgniteCheckedException If failed to initialize WAL write handle. */ private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { - int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); + long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); archiver.currentWalIndex(absIdx); - int segNo = absIdx % dbCfg.getWalSegments(); + long segNo = absIdx % dbCfg.getWalSegments(); File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version())); @@ -625,7 +624,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws StorageException If IO exception occurred. * @throws IgniteCheckedException If failed. */ - private FileWriteHandle initNextWriteHandle(int curIdx) throws StorageException, IgniteCheckedException { + private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException { try { File nextFile = pollNextFile(curIdx); @@ -756,11 +755,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return File ready for use as new WAL segment. * @throws IgniteCheckedException If failed. */ - private File pollNextFile(int curIdx) throws IgniteCheckedException { + private File pollNextFile(long curIdx) throws IgniteCheckedException { // Signal to archiver that we are done with the segment and it can be archived. - int absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); + long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); - int segmentIdx = absNextIdx % dbCfg.getWalSegments(); + long segmentIdx = absNextIdx % dbCfg.getWalSegments(); return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version())); } @@ -827,22 +826,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>. * Incremented during rollover. Also may be directly set if WAL is resuming logging after start. */ - private int curAbsWalIdx = -1; + private long curAbsWalIdx = -1; /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ - private int lastAbsArchivedIdx = -1; + private long lastAbsArchivedIdx = -1; /** current thread stopping advice */ private volatile boolean stopped; /** */ - private NavigableMap<Integer, Integer> reserved = new TreeMap<>(); + private NavigableMap<Long, Integer> reserved = new TreeMap<>(); /** * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. */ - private Map<Integer, Integer> locked = new HashMap<>(); + private Map<Long, Integer> locked = new HashMap<>(); /** * @@ -869,7 +868,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param curAbsWalIdx Current absolute WAL segment index. */ - private void currentWalIndex(int curAbsWalIdx) { + private void currentWalIndex(long curAbsWalIdx) { synchronized (this) { this.curAbsWalIdx = curAbsWalIdx; @@ -880,7 +879,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param absIdx Index for reservation. */ - private synchronized void reserve(int absIdx) { + private synchronized void reserve(long absIdx) { Integer cur = reserved.get(absIdx); if (cur == null) @@ -893,14 +892,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param absIdx Index for reservation. * @return {@code True} if index is reserved. */ - private synchronized boolean reserved(int absIdx) { + private synchronized boolean reserved(long absIdx) { return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null; } /** * @param absIdx Reserved index. */ - private synchronized void release(int absIdx) { + private synchronized void release(long absIdx) { Integer cur = reserved.get(absIdx); assert cur != null && cur >= 1 : cur; @@ -937,7 +936,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } while (!Thread.currentThread().isInterrupted() && !stopped) { - int toArchive; + long toArchive; synchronized (this) { assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx + @@ -991,7 +990,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return Next index (curIdx+1) when it is ready to be written. * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). */ - private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException { + private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException { try { synchronized (this) { if (cleanException != null) @@ -1022,7 +1021,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return {@code True} if can read, {@code false} if work segment */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private boolean checkCanReadArchiveOrReserveWorkSegment(int absIdx) { + private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { synchronized (this) { if (lastAbsArchivedIdx >= absIdx) return true; @@ -1044,7 +1043,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param absIdx Segment absolute index. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private void releaseWorkSegment(int absIdx) { + private void releaseWorkSegment(long absIdx) { synchronized (this) { Integer cur = locked.get(absIdx); @@ -1070,8 +1069,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param absIdx Absolute index to archive. */ - private File archiveSegment(int absIdx) throws IgniteCheckedException { - int segIdx = absIdx % dbCfg.getWalSegments(); + private File archiveSegment(long absIdx) throws IgniteCheckedException { + long segIdx = absIdx % dbCfg.getWalSegments(); File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version())); @@ -1187,7 +1186,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl protected final File file; /** Absolute WAL segment file index */ - protected final int idx; + protected final long idx; /** */ protected final int ver; @@ -1203,7 +1202,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param file File. * @param idx Absolute WAL segment file index. */ - private FileDescriptor(File file, Integer idx) { + private FileDescriptor(File file, Long idx) { this.file = file; String fileName = file.getName(); @@ -1218,7 +1217,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length(); if (idx == null) - this.idx = Integer.parseInt(fileName.substring(0, v)); + this.idx = Long.parseLong(fileName.substring(0, v)); else this.idx = idx; @@ -1280,7 +1279,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public int hashCode() { - return idx; + return (int)(idx ^ (idx >>> 32)); } } @@ -1295,7 +1294,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl protected FileChannel ch; /** */ - protected final int idx; + protected final long idx; /** */ protected String gridName; @@ -1304,7 +1303,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param file File. * @param idx Index. */ - private FileHandle(RandomAccessFile file, int idx, String gridName) { + private FileHandle(RandomAccessFile file, long idx, String gridName) { this.file = file; this.idx = idx; this.gridName = gridName; @@ -1333,7 +1332,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private ReadFileHandle( RandomAccessFile file, - int idx, + long idx, String gridName, RecordSerializer ser, FileInput in @@ -1413,7 +1412,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private FileWriteHandle( RandomAccessFile file, - int idx, + long idx, String gridName, long pos, long maxSegmentSize, @@ -1428,7 +1427,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl this.maxSegmentSize = maxSegmentSize; this.serializer = serializer; - head.set(new FakeRecord(pos)); + head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0))); written = pos; lastFsyncPos = pos; } @@ -1469,10 +1468,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl rec.chainSize(newChainSize); rec.previous(h); - rec.position(nextPos); + + FileWALPointer ptr = new FileWALPointer(idx, (int)nextPos, rec.size()); + + rec.position(ptr); if (head.compareAndSet(h, rec)) - return new FileWALPointer(idx, (int)rec.position(), rec.size()); + return ptr; } } @@ -1481,7 +1483,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return Position for the next record. */ private long nextPosition(WALRecord rec) { - return rec.position() + rec.size(); + return recordOffset(rec) + rec.size(); } /** @@ -1501,7 +1503,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl expWritten = ptr.fileOffset(); } else // We read head position before the flush because otherwise we can get wrong position. - expWritten = head.get().position(); + expWritten = recordOffset(head.get()); if (flush(ptr)) return; @@ -1565,7 +1567,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return Chain begin position. */ private long chainBeginPosition(WALRecord h) { - return h.position() + h.size() - h.chainSize(); + return recordOffset(h) + h.size() - h.chainSize(); } /** @@ -1583,7 +1585,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Fail-fast before CAS. checkEnvironment(); - if (!head.compareAndSet(expHead, new FakeRecord(nextPosition(expHead)))) + if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0)))) return false; // At this point we grabbed the piece of WAL chain. @@ -1658,7 +1660,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl buf.rewind(); buf.limit(limit); - return head.position(); + return recordOffset(head); } /** @@ -1951,6 +1953,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Gets WAL record offset relative to the WAL segment file beginning. + * + * @param rec WAL record. + * @return File offset. + */ + private static int recordOffset(WALRecord rec) { + FileWALPointer ptr = (FileWALPointer)rec.position(); + + assert ptr != null; + + return ptr.fileOffset(); + } + + /** * Fake record is zero-sized record, which is not stored into file. * Fake record is used for storing position in file {@link WALRecord#position()}. * Fake record is allowed to have no previous record. @@ -1959,7 +1975,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param pos Position. */ - FakeRecord(long pos) { + FakeRecord(FileWALPointer pos) { position(pos); } @@ -2009,7 +2025,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private IgniteBiTuple<WALPointer, WALRecord> curRec; /** */ - private int curIdx = -1; + private long curIdx = -1; /** */ private ReadFileHandle curHandle; @@ -2115,7 +2131,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } if (curIdx == -1) { - int lastArchived = descs[descs.length - 1].idx; + long lastArchived = descs[descs.length - 1].idx; if (lastArchived > start.index()) throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start); @@ -2159,9 +2175,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @throws IgniteCheckedException If failed. + * */ - private void advanceRecord() throws IgniteCheckedException { + private void advanceRecord() { try { ReadFileHandle hnd = curHandle; @@ -2170,15 +2186,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int pos = (int)hnd.in.position(); - WALRecord rec = ser.readRecord(hnd.in); + FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0); - WALPointer ptr = new FileWALPointer(hnd.idx, pos, rec.size()); + WALRecord rec = ser.readRecord(hnd.in, ptr); - curRec = new IgniteBiTuple<>(ptr, rec); + ptr.length(rec.size()); + + curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec); } } catch (IOException | IgniteCheckedException e) { - // TODO: verify that wrapped IntegrityException is acceptable in this case. + if (!(e instanceof SegmentEofException)) { + if (log.isInfoEnabled()) + log.info("Stopping WAL iteration due to an exception: " + e.getMessage()); + } + curRec = null; } } @@ -2213,7 +2235,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor.fileName(curIdx, serializer.version()))); } else { - int workIdx = curIdx % dbCfg.getWalSegments(); + long workIdx = curIdx % dbCfg.getWalSegments(); fd = new FileDescriptor( new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())), @@ -2257,9 +2279,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { RecordSerializer ser = forVersion(cctx, desc.ver); - FileInput in = new FileInput(rf.getChannel(), buf); + FileChannel channel = rf.getChannel(); + FileInput in = new FileInput(channel, buf); - WALRecord rec = ser.readRecord(in); + WALRecord rec = ser.readRecord(in, + new FileWALPointer(desc.idx, (int)channel.position(), 0)); if (rec == null) return null; @@ -2314,14 +2338,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * archived yet. In this case the corresponding work segment is reserved (will not be deleted until * release). */ - private boolean canReadArchiveOrReserveWork(int absIdx) { + private boolean canReadArchiveOrReserveWork(long absIdx) { return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); } /** * @param absIdx Absolute index to release. */ - private void releaseWorkSegment(int absIdx) { + private void releaseWorkSegment(long absIdx) { if (archiver != null) archiver.releaseWorkSegment(absIdx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java index e3a972a..c929789 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/RecordSerializer.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.database.wal; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; /** @@ -47,5 +48,5 @@ public interface RecordSerializer { * @param in Data input to read data from. * @return Read entry. */ - public WALRecord readRecord(FileInput in) throws IOException, IgniteCheckedException; + public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java index f67f617..442c08d 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java @@ -30,6 +30,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CacheState; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -37,8 +38,6 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry; import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; -import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord; -import org.apache.ignite.internal.pagemem.wal.record.StoreOperationRecord.StoreOperationType; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; @@ -97,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.database.wal.record.HeaderRec import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.util.typedef.F; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; @@ -140,6 +140,8 @@ public class RecordV1Serializer implements RecordSerializer { buf.put((byte)(record.type().ordinal() + 1)); + putPosition(buf, (FileWALPointer)record.position()); + switch (record.type()) { case PAGE_RECORD: PageSnapshot snap = (PageSnapshot)record; @@ -150,16 +152,6 @@ public class RecordV1Serializer implements RecordSerializer { break; - case STORE_OPERATION_RECORD: - StoreOperationRecord storeRec = (StoreOperationRecord)record; - - buf.put((byte)storeRec.operationType().ordinal()); - buf.putInt(storeRec.cacheId()); - buf.putLong(storeRec.link()); - buf.putInt(storeRec.indexId()); - - break; - case MEMORY_RECOVERY: MemoryRecoveryRecord memoryRecoveryRecord = (MemoryRecoveryRecord)record; @@ -217,7 +209,7 @@ public class RecordV1Serializer implements RecordSerializer { buf.put(walPtr == null ? (byte)0 : 1); if (walPtr != null) { - buf.putInt(walPtr.index()); + buf.putLong(walPtr.index()); buf.putInt(walPtr.fileOffset()); buf.putInt(walPtr.length()); } @@ -651,13 +643,13 @@ public class RecordV1Serializer implements RecordSerializer { } /** {@inheritDoc} */ - @Override public WALRecord readRecord(FileInput in0) throws IOException, IgniteCheckedException { + @Override public WALRecord readRecord(FileInput in0, WALPointer expPtr) throws IOException, IgniteCheckedException { long startPos = -1; try (FileInput.Crc32CheckingFileInput in = in0.startRead(skipCrc)) { startPos = in0.position(); - WALRecord res = readRecord(in); + WALRecord res = readRecord(in, expPtr); assert res != null; @@ -676,12 +668,18 @@ public class RecordV1Serializer implements RecordSerializer { /** * @param in In. */ - private WALRecord readRecord(ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException { + private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException { int type = in.readUnsignedByte(); if (type == 0) throw new SegmentEofException("Reached logical end of the segment", null); + FileWALPointer ptr = readPosition(in); + + if (!F.eq(ptr, expPtr)) + throw new SegmentEofException("WAL segment rollover detected (will end iteration) [expPtr=" + expPtr + + ", readPtr=" + ptr + ']', null); + RecordType recType = RecordType.fromOrdinal(type - 1); if (recType == null) @@ -702,18 +700,6 @@ public class RecordV1Serializer implements RecordSerializer { break; - case STORE_OPERATION_RECORD: - StoreOperationRecord storeRec = new StoreOperationRecord(); - - storeRec.operationType(StoreOperationType.fromOrdinal(in.readByte() & 0xFF)); - storeRec.cacheId(in.readInt()); - storeRec.link(in.readLong()); - storeRec.indexId(in.readInt()); - - res = storeRec; - - break; - case CHECKPOINT_RECORD: long msb = in.readLong(); long lsb = in.readLong(); @@ -1217,16 +1203,15 @@ public class RecordV1Serializer implements RecordSerializer { /** {@inheritDoc} */ @SuppressWarnings("CastConflictsWithInstanceof") @Override public int size(WALRecord record) throws IgniteCheckedException { + int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4; + switch (record.type()) { case PAGE_RECORD: assert record instanceof PageSnapshot; PageSnapshot pageRec = (PageSnapshot)record; - return pageRec.pageData().length + 12 + 1 + 4; - - case STORE_OPERATION_RECORD: - return 18 + 4; + return commonFields + pageRec.pageData().length + 12; case CHECKPOINT_RECORD: CheckpointRecord cpRec = (CheckpointRecord)record; @@ -1238,147 +1223,146 @@ public class RecordV1Serializer implements RecordSerializer { FileWALPointer walPtr = (FileWALPointer)cpRec.checkpointMark(); - return 19 + cacheStatesSize + (walPtr == null ? 0 : 12) + 4; + return commonFields + 18 + cacheStatesSize + (walPtr == null ? 0 : 16); case META_PAGE_INIT: - return 1 + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8 + /*CRC*/4; + return commonFields + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2 + /*ioVer*/2 + /*tree root*/8 + /*reuse root*/8; case PARTITION_META_PAGE_UPDATE_COUNTERS: - return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1 - + /*allocatedIdxCandidate*/ 4 + /*CRC*/4; + return commonFields + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part size*/4 + /*state*/ 1 + + /*allocatedIdxCandidate*/ 4; case MEMORY_RECOVERY: - return 1 + 8 + 4; + return commonFields + 8; case PARTITION_DESTROY: - return 1 + /*cacheId*/4 + /*partId*/4 + /*CRC*/4; + return commonFields + /*cacheId*/4 + /*partId*/4; case DATA_RECORD: DataRecord dataRec = (DataRecord)record; - return 5 + dataSize(dataRec) + 4; + return commonFields + 4 + dataSize(dataRec); case HEADER_RECORD: - return 13 + 4; + return commonFields + 12; case DATA_PAGE_INSERT_RECORD: DataPageInsertRecord diRec = (DataPageInsertRecord)record; - return 1 + 4 + 8 + 2 + - diRec.payload().length + 4; + return commonFields + 4 + 8 + 2 + diRec.payload().length; case DATA_PAGE_UPDATE_RECORD: DataPageUpdateRecord uRec = (DataPageUpdateRecord)record; - return 1 + 4 + 8 + 2 + 4 + - uRec.payload().length + 4; + return commonFields + 4 + 8 + 2 + 4 + + uRec.payload().length; case DATA_PAGE_INSERT_FRAGMENT_RECORD: final DataPageInsertFragmentRecord difRec = (DataPageInsertFragmentRecord)record; - return 1 + 4 + 8 + 8 + 4 + difRec.payloadSize() + 4; + return commonFields + 4 + 8 + 8 + 4 + difRec.payloadSize(); case DATA_PAGE_REMOVE_RECORD: - return 1 + 4 + 8 + 1 + 4; + return commonFields + 4 + 8 + 1; case DATA_PAGE_SET_FREE_LIST_PAGE: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case INIT_NEW_PAGE_RECORD: - return 1 + 4 + 8 + 2 + 2 + 8 + 4; + return commonFields + 4 + 8 + 2 + 2 + 8; case BTREE_META_PAGE_INIT_ROOT: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case BTREE_META_PAGE_INIT_ROOT2: - return 1 + 4 + 8 + 8 + 4 + 2; + return commonFields + 4 + 8 + 8 + 2; case BTREE_META_PAGE_ADD_ROOT: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case BTREE_META_PAGE_CUT_ROOT: - return 1 + 4 + 8 + 4; + return commonFields + 4 + 8; case BTREE_INIT_NEW_ROOT: NewRootInitRecord<?> riRec = (NewRootInitRecord<?>)record; - return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize() + 4; + return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 8 + riRec.io().getItemSize(); case BTREE_PAGE_RECYCLE: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case BTREE_PAGE_INSERT: InsertRecord<?> inRec = (InsertRecord<?>)record; - return 1 + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize() + 4; + return commonFields + 4 + 8 + 2 + 2 + 2 + 8 + inRec.io().getItemSize(); case BTREE_FIX_LEFTMOST_CHILD: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case BTREE_FIX_COUNT: - return 1 + 4 + 8 + 2 + 4; + return commonFields + 4 + 8 + 2; case BTREE_PAGE_REPLACE: ReplaceRecord<?> rRec = (ReplaceRecord<?>)record; - return 1 + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize() + 4; + return commonFields + 4 + 8 + 2 + 2 + 2 + rRec.io().getItemSize(); case BTREE_PAGE_REMOVE: - return 1 + 4 + 8 + 2 + 2 + 4; + return commonFields + 4 + 8 + 2 + 2; case BTREE_PAGE_INNER_REPLACE: - return 1 + 4 + 8 + 2 + 8 + 2 + 8 + 4; + return commonFields + 4 + 8 + 2 + 8 + 2 + 8; case BTREE_FORWARD_PAGE_SPLIT: - return 1 + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2 + 4; + return commonFields + 4 + 8 + 8 + 2 + 2 + 8 + 2 + 2; case BTREE_EXISTING_PAGE_SPLIT: - return 1 + 4 + 8 + 2 + 8 + 4; + return commonFields + 4 + 8 + 2 + 8; case BTREE_PAGE_MERGE: - return 1 + 4 + 8 + 8 + 2 + 8 + 1 + 4; + return commonFields + 4 + 8 + 8 + 2 + 8 + 1; case BTREE_FIX_REMOVE_ID: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case PAGES_LIST_SET_NEXT: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case PAGES_LIST_SET_PREVIOUS: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case PAGES_LIST_INIT_NEW_PAGE: - return 1 + 4 + 8 + 4 + 4 + 8 + 8 + 8 + 4; + return commonFields + 4 + 8 + 4 + 4 + 8 + 8 + 8; case PAGES_LIST_ADD_PAGE: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case PAGES_LIST_REMOVE_PAGE: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case TRACKING_PAGE_DELTA: - return 1 + 4 + 8 + 8 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8 + 8 + 8; case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID: - return 1 + 4 + 8 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8 + 8; case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID: - return 1 + 4 + 8 + 8 + 4; + return commonFields + 4 + 8 + 8; case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX: - return 1 + 4 + 8 + 4 + 4; + return commonFields + 4 + 8 + 4; case PART_META_UPDATE_STATE: - return /*Type*/ 1 + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8 + /*CRC*/4; + return commonFields + /*cacheId*/ 4 + /*partId*/ 4 + /*State*/1 + /*Update Counter*/ 8; case PAGE_LIST_META_RESET_COUNT_RECORD: - return /*Type*/ 1 + /*cacheId*/ 4 + /*pageId*/ 8 + /*CRC*/4; + return commonFields + /*cacheId*/ 4 + /*pageId*/ 8; case SWITCH_SEGMENT_RECORD: - return /*Type*/ 1 + /*CRC*/4; + return commonFields; default: throw new UnsupportedOperationException("Type: " + record.type()); @@ -1386,6 +1370,27 @@ public class RecordV1Serializer implements RecordSerializer { } /** + * @param buf Byte buffer to serialize version to. + * @param ptr File WAL pointer to write. + */ + private void putPosition(ByteBuffer buf, FileWALPointer ptr) { + buf.putLong(ptr.index()); + buf.putInt(ptr.fileOffset()); + } + + /** + * @param in Data input to read pointer from. + * @return Read file WAL pointer. + * @throws IOException If failed to write. + */ + private FileWALPointer readPosition(DataInput in) throws IOException { + long idx = in.readLong(); + int fileOffset = in.readInt(); + + return new FileWALPointer(idx, fileOffset, 0); + } + + /** * @param dataRec Data record to serialize. * @return Full data record size. * @throws IgniteCheckedException If failed to obtain the length of one of the entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd2cec5/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java index bdf333c..225a9d2 100644 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java @@ -464,7 +464,7 @@ public class IgniteWalRecoverySelfTest extends GridCommonAbstractTest { walSegmentSize = 2 * 1024 * 1024; - final long endTime = System.currentTimeMillis() + 3 * 60 * 1000; + final long endTime = System.currentTimeMillis() + 2 * 60 * 1000; try { IgniteEx ignite = startGrid(1);
