Repository: ignite
Updated Branches:
  refs/heads/ignite-5323 [created] 9277d6494


IGNITE-5323 - WAL serializer version switch


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9277d649
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9277d649
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9277d649

Branch: refs/heads/ignite-5323
Commit: 9277d6494f88211488e15691492a54d1fc0dbe2e
Parents: c6313b7
Author: Alexey Goncharuk <[email protected]>
Authored: Wed May 31 13:25:29 2017 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed May 31 13:25:29 2017 +0300

----------------------------------------------------------------------
 .../database/wal/FileWriteAheadLogManager.java  | 105 ++++++++++++-------
 .../wal/serializer/RecordV1Serializer.java      |  13 ++-
 .../db/file/IgniteWalRecoverySelfTest.java      |   2 +-
 3 files changed, 80 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
 
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
index f8b18ef..e6b2db9 100644
--- 
a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
+++ 
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/FileWriteAheadLogManager.java
@@ -79,10 +79,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static final byte[] FILL_BUF = new byte[1024 * 1024];
 
     /** */
-    private static final Pattern WAL_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.v\\d+\\.wal");
+    private static final Pattern WAL_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal");
 
     /** */
-    private static final Pattern WAL_TEMP_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.v\\d+\\.wal\\.tmp");
+    private static final Pattern WAL_TEMP_NAME_PATTERN = 
Pattern.compile("\\d{16}\\.wal\\.tmp");
 
     /** */
     private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() 
{
@@ -323,6 +323,15 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             currentHnd = restoreWriteHandle(filePtr);
 
+            if (currentHnd.serializer.version() != serializer.version()) {
+                if (log.isInfoEnabled())
+                    log.info("Record serializer version change detected, will 
start logging with a new WAL record " +
+                        "serializer to a new WAL segment [curFile=" + 
currentHnd + ", newVer=" + serializer.version() +
+                        ", oldVer=" + currentHnd.serializer.version() + ']');
+
+                rollOver(currentHnd);
+            }
+
             if (mode == Mode.BACKGROUND) {
                 flusher = new QueueFlusher(cctx.igniteInstanceName());
 
@@ -444,7 +453,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     private boolean hasIndex(int absIdx) {
-        String name = FileDescriptor.fileName(absIdx, serializer.version());
+        String name = FileDescriptor.fileName(absIdx);
 
         boolean inArchive = new File(walArchiveDir, name).exists();
 
@@ -570,29 +579,35 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) 
throws IgniteCheckedException {
         int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
-        archiver.currentWalIndex(absIdx);
-
         int segNo = absIdx % dbCfg.getWalSegments();
 
-        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, 
serializer.version()));
+        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
 
         int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
 
-        log.info("Resuming logging in WAL segment [file=" + 
curFile.getAbsolutePath() +
-            ", offset=" + offset + ']');
-
         try {
             RandomAccessFile file = new RandomAccessFile(curFile, "rw");
 
             try {
+                // readSerializerVersion will change the channel position.
+                // This is fine because the FileWriteHandle consitructor will 
move it
+                // to offset + len anyways.
+                int serVer = readSerializerVersion(file, curFile);
+
+                RecordSerializer ser = forVersion(cctx, serVer);
+
+                if (log.isInfoEnabled())
+                    log.info("Resuming logging to WAL segment [file=" + 
curFile.getAbsolutePath() +
+                        ", offset=" + offset + ", ver=" + serVer + ']');
+
                 FileWriteHandle hnd = new FileWriteHandle(
                     file,
                     absIdx,
                     cctx.igniteInstanceName(),
                     offset + len,
                     maxWalSegmentSize,
-                    serializer);
+                    ser);
 
                 if (lastReadPtr == null) {
                     HeaderRecord header = new 
HeaderRecord(serializer.version());
@@ -602,6 +617,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     hnd.addRecord(header);
                 }
 
+                archiver.currentWalIndex(absIdx);
+
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
@@ -682,7 +699,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         // Allocate the first segment synchronously. All other segments will 
be allocated by archiver in background.
         if (allFiles.length == 0) {
-            File first = new File(walWorkDir, FileDescriptor.fileName(0, 
serializer.version()));
+            File first = new File(walWorkDir, FileDescriptor.fileName(0));
 
             createFile(first);
         }
@@ -762,7 +779,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         int segmentIdx = absNextIdx % dbCfg.getWalSegments();
 
-        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, 
serializer.version()));
+        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
     }
 
     /**
@@ -1073,9 +1090,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private File archiveSegment(int absIdx) throws IgniteCheckedException {
             int segIdx = absIdx % dbCfg.getWalSegments();
 
-            File origFile = new File(walWorkDir, 
FileDescriptor.fileName(segIdx, serializer.version()));
+            File origFile = new File(walWorkDir, 
FileDescriptor.fileName(segIdx));
 
-            String name = FileDescriptor.fileName(absIdx, 
serializer.version());
+            String name = FileDescriptor.fileName(absIdx);
 
             File dstTmpFile = new File(walArchiveDir, name + ".tmp");
 
@@ -1164,7 +1181,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      */
     private void checkFiles(int startWith, boolean create, 
IgnitePredicate<Integer> p) throws IgniteCheckedException {
         for (int i = startWith; i < dbCfg.getWalSegments() && (p == null || (p 
!= null && p.apply(i))); i++) {
-            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i, 
serializer.version()));
+            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
 
             if (checkFile.exists()) {
                 if (checkFile.isDirectory())
@@ -1180,6 +1197,35 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * @param rf Random access file.
+     * @param file File object.
+     * @return Serializer version stored in the file.
+     * @throws IOException If failed to read serializer version.
+     * @throws IgniteCheckedException If failed to read serializer version.
+     */
+    private int readSerializerVersion(RandomAccessFile rf, File file) throws 
IOException, IgniteCheckedException {
+        try {
+            ByteBuffer buf = 
ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
+            buf.order(ByteOrder.nativeOrder());
+
+            FileInput in = new FileInput(rf.getChannel(), buf);
+
+            // Header record must be agnostic to the serializer version.
+            WALRecord rec = serializer.readRecord(in);
+
+            serializer.version();
+
+            if (rec.type() != WALRecord.RecordType.HEADER_RECORD)
+                throw new IOException("Missing file header record: " + 
file.getAbsoluteFile());
+
+            return ((HeaderRecord)rec).version();
+        }
+        catch (SegmentEofException | EOFException ignore) {
+            return serializer.version();
+        }
+    }
+
+    /**
      * WAL file descriptor.
      */
     private static class FileDescriptor implements Comparable<FileDescriptor> {
@@ -1189,9 +1235,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /** Absolute WAL segment file index */
         protected final int idx;
 
-        /** */
-        protected final int ver;
-
         /**
          * @param file File.
          */
@@ -1210,27 +1253,19 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             assert fileName.endsWith(WAL_SEGMENT_FILE_EXT);
 
-            int v = fileName.lastIndexOf(".v");
-
-            assert v > 0;
-
-            int begin = v + 2;
             int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length();
 
             if (idx == null)
-                this.idx = Integer.parseInt(fileName.substring(0, v));
+                this.idx = Integer.parseInt(fileName.substring(0, end));
             else
                 this.idx = idx;
-
-            ver = Integer.parseInt(fileName.substring(begin, end));
         }
 
         /**
          * @param segment Segment index.
-         * @param ver Serializer version.
          * @return Segment file name.
          */
-        private static String fileName(long segment, int ver) {
+        private static String fileName(long segment) {
             SB b = new SB();
 
             String segmentStr = Long.toString(segment);
@@ -1238,7 +1273,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             for (int i = segmentStr.length(); i < 16; i++)
                 b.a('0');
 
-            b.a(segmentStr).a(".v").a(ver).a(WAL_SEGMENT_FILE_EXT);
+            b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT);
 
             return b.toString();
         }
@@ -2210,13 +2245,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             if (readArchive) {
                 fd = new FileDescriptor(new File(walArchiveDir,
-                    FileDescriptor.fileName(curIdx, serializer.version())));
+                    FileDescriptor.fileName(curIdx)));
             }
             else {
                 int workIdx = curIdx % dbCfg.getWalSegments();
 
                 fd = new FileDescriptor(
-                    new File(walWorkDir, FileDescriptor.fileName(workIdx, 
serializer.version())),
+                    new File(walWorkDir, FileDescriptor.fileName(workIdx)),
                     curIdx);
             }
 
@@ -2256,10 +2291,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 RandomAccessFile rf = new RandomAccessFile(desc.file, "r");
 
                 try {
-                    RecordSerializer ser = forVersion(cctx, desc.ver);
                     FileInput in = new FileInput(rf.getChannel(), buf);
 
-                    WALRecord rec = ser.readRecord(in);
+                    // Header record must be agnostic to the serializer 
version.
+                    WALRecord rec = serializer.readRecord(in);
 
                     if (rec == null)
                         return null;
@@ -2269,9 +2304,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     int ver = ((HeaderRecord)rec).version();
 
-                    if (ver != ser.version())
-                        throw new IOException("Unexpected file format version: 
" + ver + ", " +
-                            desc.file.getAbsoluteFile());
+                    RecordSerializer ser = forVersion(cctx, ver);
 
                     if (start != null && desc.idx == start.index())
                         in.seek(start.fileOffset());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
 
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
index f67f617..6f791c5 100644
--- 
a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
+++ 
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
@@ -97,6 +97,7 @@ import 
org.apache.ignite.internal.processors.cache.database.wal.record.HeaderRec
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
@@ -105,6 +106,9 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
  */
 public class RecordV1Serializer implements RecordSerializer {
     /** */
+    public static final int HEADER_RECORD_SIZE = 17;
+
+    /** */
     private GridCacheSharedContext cctx;
 
     /** */
@@ -791,8 +795,11 @@ public class RecordV1Serializer implements 
RecordSerializer {
                 break;
 
             case HEADER_RECORD:
-                if (in.readLong() != HeaderRecord.MAGIC)
-                    throw new EOFException("Magic is corrupted.");
+                long magic = in.readLong();
+
+                if (magic != HeaderRecord.MAGIC)
+                    throw new EOFException("Magic is corrupted [exp=" + 
U.hexLong(HeaderRecord.MAGIC) +
+                        ", actual=" + U.hexLong(magic) + ']');
 
                 int ver = in.readInt();
 
@@ -1259,7 +1266,7 @@ public class RecordV1Serializer implements 
RecordSerializer {
                 return 5 + dataSize(dataRec) + 4;
 
             case HEADER_RECORD:
-                return 13 + 4;
+                return HEADER_RECORD_SIZE;
 
             case DATA_PAGE_INSERT_RECORD:
                 DataPageInsertRecord diRec = (DataPageInsertRecord)record;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9277d649/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
 
b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
index bdf333c..225a9d2 100644
--- 
a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
+++ 
b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySelfTest.java
@@ -464,7 +464,7 @@ public class IgniteWalRecoverySelfTest extends 
GridCommonAbstractTest {
 
         walSegmentSize = 2 * 1024 * 1024;
 
-        final long endTime = System.currentTimeMillis() + 3 * 60 * 1000;
+        final long endTime = System.currentTimeMillis() + 2 * 60 * 1000;
 
         try {
             IgniteEx ignite = startGrid(1);

Reply via email to