Repository: ignite Updated Branches: refs/heads/ignite-5323 [created] 9277d6494
IGNITE-5323 - WAL serializer version switch Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9277d649 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9277d649 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9277d649 Branch: refs/heads/ignite-5323 Commit: 9277d6494f88211488e15691492a54d1fc0dbe2e Parents: c6313b7 Author: Alexey Goncharuk <[email protected]> Authored: Wed May 31 13:25:29 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed May 31 13:25:29 2017 +0300 ---------------------------------------------------------------------- .../database/wal/FileWriteAheadLogManager.java | 105 ++++++++++++------- .../wal/serializer/RecordV1Serializer.java | 13 ++- .../db/file/IgniteWalRecoverySelfTest.java | 2 +- 3 files changed, 80 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java index f8b18ef..e6b2db9 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java @@ -79,10 +79,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final byte[] FILL_BUF = new byte[1024 * 1024]; /** */ - private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal"); + private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); /** */ - private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal\\.tmp"); + private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); /** */ private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { @@ -323,6 +323,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl currentHnd = restoreWriteHandle(filePtr); + if (currentHnd.serializer.version() != serializer.version()) { + if (log.isInfoEnabled()) + log.info("Record serializer version change detected, will start logging with a new WAL record " + + "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + + ", oldVer=" + currentHnd.serializer.version() + ']'); + + rollOver(currentHnd); + } + if (mode == Mode.BACKGROUND) { flusher = new QueueFlusher(cctx.igniteInstanceName()); @@ -444,7 +453,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } private boolean hasIndex(int absIdx) { - String name = FileDescriptor.fileName(absIdx, serializer.version()); + String name = FileDescriptor.fileName(absIdx); boolean inArchive = new File(walArchiveDir, name).exists(); @@ -570,29 +579,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); - archiver.currentWalIndex(absIdx); - int segNo = absIdx % dbCfg.getWalSegments(); - File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version())); + File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); int len = lastReadPtr == null ? 0 : lastReadPtr.length(); - log.info("Resuming logging in WAL segment [file=" + curFile.getAbsolutePath() + - ", offset=" + offset + ']'); - try { RandomAccessFile file = new RandomAccessFile(curFile, "rw"); try { + // readSerializerVersion will change the channel position. + // This is fine because the FileWriteHandle consitructor will move it + // to offset + len anyways. + int serVer = readSerializerVersion(file, curFile); + + RecordSerializer ser = forVersion(cctx, serVer); + + if (log.isInfoEnabled()) + log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + + ", offset=" + offset + ", ver=" + serVer + ']'); + FileWriteHandle hnd = new FileWriteHandle( file, absIdx, cctx.igniteInstanceName(), offset + len, maxWalSegmentSize, - serializer); + ser); if (lastReadPtr == null) { HeaderRecord header = new HeaderRecord(serializer.version()); @@ -602,6 +617,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl hnd.addRecord(header); } + archiver.currentWalIndex(absIdx); + return hnd; } catch (IgniteCheckedException | IOException e) { @@ -682,7 +699,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. if (allFiles.length == 0) { - File first = new File(walWorkDir, FileDescriptor.fileName(0, serializer.version())); + File first = new File(walWorkDir, FileDescriptor.fileName(0)); createFile(first); } @@ -762,7 +779,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int segmentIdx = absNextIdx % dbCfg.getWalSegments(); - return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version())); + return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); } /** @@ -1073,9 +1090,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private File archiveSegment(int absIdx) throws IgniteCheckedException { int segIdx = absIdx % dbCfg.getWalSegments(); - File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version())); + File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); - String name = FileDescriptor.fileName(absIdx, serializer.version()); + String name = FileDescriptor.fileName(absIdx); File dstTmpFile = new File(walArchiveDir, name + ".tmp"); @@ -1164,7 +1181,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer> p) throws IgniteCheckedException { for (int i = startWith; i < dbCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { - File checkFile = new File(walWorkDir, FileDescriptor.fileName(i, serializer.version())); + File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); if (checkFile.exists()) { if (checkFile.isDirectory()) @@ -1180,6 +1197,35 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * @param rf Random access file. + * @param file File object. + * @return Serializer version stored in the file. + * @throws IOException If failed to read serializer version. + * @throws IgniteCheckedException If failed to read serializer version. + */ + private int readSerializerVersion(RandomAccessFile rf, File file) throws IOException, IgniteCheckedException { + try { + ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); + buf.order(ByteOrder.nativeOrder()); + + FileInput in = new FileInput(rf.getChannel(), buf); + + // Header record must be agnostic to the serializer version. + WALRecord rec = serializer.readRecord(in); + + serializer.version(); + + if (rec.type() != WALRecord.RecordType.HEADER_RECORD) + throw new IOException("Missing file header record: " + file.getAbsoluteFile()); + + return ((HeaderRecord)rec).version(); + } + catch (SegmentEofException | EOFException ignore) { + return serializer.version(); + } + } + + /** * WAL file descriptor. */ private static class FileDescriptor implements Comparable<FileDescriptor> { @@ -1189,9 +1235,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Absolute WAL segment file index */ protected final int idx; - /** */ - protected final int ver; - /** * @param file File. */ @@ -1210,27 +1253,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert fileName.endsWith(WAL_SEGMENT_FILE_EXT); - int v = fileName.lastIndexOf(".v"); - - assert v > 0; - - int begin = v + 2; int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length(); if (idx == null) - this.idx = Integer.parseInt(fileName.substring(0, v)); + this.idx = Integer.parseInt(fileName.substring(0, end)); else this.idx = idx; - - ver = Integer.parseInt(fileName.substring(begin, end)); } /** * @param segment Segment index. - * @param ver Serializer version. * @return Segment file name. */ - private static String fileName(long segment, int ver) { + private static String fileName(long segment) { SB b = new SB(); String segmentStr = Long.toString(segment); @@ -1238,7 +1273,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl for (int i = segmentStr.length(); i < 16; i++) b.a('0'); - b.a(segmentStr).a(".v").a(ver).a(WAL_SEGMENT_FILE_EXT); + b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT); return b.toString(); } @@ -2210,13 +2245,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (readArchive) { fd = new FileDescriptor(new File(walArchiveDir, - FileDescriptor.fileName(curIdx, serializer.version()))); + FileDescriptor.fileName(curIdx))); } else { int workIdx = curIdx % dbCfg.getWalSegments(); fd = new FileDescriptor( - new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())), + new File(walWorkDir, FileDescriptor.fileName(workIdx)), curIdx); } @@ -2256,10 +2291,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); try { - RecordSerializer ser = forVersion(cctx, desc.ver); FileInput in = new FileInput(rf.getChannel(), buf); - WALRecord rec = ser.readRecord(in); + // Header record must be agnostic to the serializer version. + WALRecord rec = serializer.readRecord(in); if (rec == null) return null; @@ -2269,9 +2304,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int ver = ((HeaderRecord)rec).version(); - if (ver != ser.version()) - throw new IOException("Unexpected file format version: " + ver + ", " + - desc.file.getAbsoluteFile()); + RecordSerializer ser = forVersion(cctx, ver); if (start != null && desc.idx == start.index()) in.seek(start.fileOffset()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java index f67f617..6f791c5 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.database.wal.record.HeaderRec import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; @@ -105,6 +106,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC; */ public class RecordV1Serializer implements RecordSerializer { /** */ + public static final int HEADER_RECORD_SIZE = 17; + + /** */ private GridCacheSharedContext cctx; /** */ @@ -791,8 +795,11 @@ public class RecordV1Serializer implements RecordSerializer { break; case HEADER_RECORD: - if (in.readLong() != HeaderRecord.MAGIC) - throw new EOFException("Magic is corrupted."); + long magic = in.readLong(); + + if (magic != HeaderRecord.MAGIC) + throw new EOFException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + + ", actual=" + U.hexLong(magic) + ']'); int ver = in.readInt(); @@ -1259,7 +1266,7 @@ public class RecordV1Serializer implements RecordSerializer { return 5 + dataSize(dataRec) + 4; case HEADER_RECORD: - return 13 + 4; + return HEADER_RECORD_SIZE; case DATA_PAGE_INSERT_RECORD: DataPageInsertRecord diRec = (DataPageInsertRecord)record; http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java index bdf333c..225a9d2 100644 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java @@ -464,7 +464,7 @@ public class IgniteWalRecoverySelfTest extends GridCommonAbstractTest { walSegmentSize = 2 * 1024 * 1024; - final long endTime = System.currentTimeMillis() + 3 * 60 * 1000; + final long endTime = System.currentTimeMillis() + 2 * 60 * 1000; try { IgniteEx ignite = startGrid(1);
