Repository: ignite
Updated Branches:
  refs/heads/master 28f32b885 -> 932692ecc


IGNITE-7865 Supported serializerVersion method for WAL manager - Fixes #3594.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/932692ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/932692ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/932692ec

Branch: refs/heads/master
Commit: 932692ecc7a3e0feda856ae37bd2b2c182274b35
Parents: 28f32b8
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Fri Mar 2 15:11:50 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Mar 2 15:11:50 2018 +0300

----------------------------------------------------------------------
 .../pagemem/wal/IgniteWriteAheadLogManager.java |   5 +
 .../GridDhtPartitionsExchangeFuture.java        |   9 +-
 .../wal/FileWriteAheadLogManager.java           |  10 +-
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 112 ++++++++++++++++---
 .../persistence/pagemem/NoOpWALManager.java     |   5 +
 5 files changed, 116 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 6c3c36e..55806d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -38,6 +38,11 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
     public boolean isFullSync();
 
     /**
+     * @return Current serializer version.
+     */
+    public int serializerVersion();
+
+    /**
      * Resumes logging after start. When WAL manager is started, it will skip 
logging any updates until this
      * method is called to avoid logging changes induced by the state restore 
procedure.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a56c4e2..b4febf7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -81,7 +81,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -830,7 +829,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     top.update(null,
                         clientTop.partitionMap(true),
                         clientTop.fullUpdateCounters(),
-                        Collections.<Integer>emptySet(),
+                        Collections.emptySet(),
                         null);
                 }
             }
@@ -1694,7 +1693,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      */
     private void logExchange(DiscoveryEvent evt) {
         if (cctx.kernalContext().state().publicApiActiveState(false) && 
cctx.wal() != null) {
-            if (((FileWriteAheadLogManager)cctx.wal()).serializerVersion() > 1)
+            if (cctx.wal().serializerVersion() > 1)
                 try {
                     ExchangeRecord.Type type = null;
 
@@ -2248,7 +2247,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             CounterWithNodes maxCntr = maxCntrs.get(part.id());
 
             if (maxCntr == null && cntr == 0) {
-                CounterWithNodes cntrObj = new CounterWithNodes(cntr, 
cctx.localNodeId());
+                CounterWithNodes cntrObj = new CounterWithNodes(0, 
cctx.localNodeId());
 
                 for (UUID nodeId : msgs.keySet()) {
                     if (top.partitionState(nodeId, part.id()) == 
GridDhtPartitionState.OWNING)
@@ -3087,7 +3086,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     top.update(resTopVer,
                         entry.getValue(),
                         cntrMap,
-                        Collections.<Integer>emptySet(),
+                        Collections.emptySet(),
                         null);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 9bd3bfd..69ee96a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -645,10 +645,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
     }
 
-    /**
-     * @return Latest serializer version.
-     */
-    public int serializerVersion() {
+    /** {@inheritDoc} */
+    @Override public int serializerVersion() {
         return serializerVer;
     }
 
@@ -2304,7 +2302,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /**
          * @return True if segment is ZIP compressed.
          */
-        public boolean isCompressed() {
+        @Override public boolean isCompressed() {
             return file.getName().endsWith(".zip");
         }
 
@@ -2376,7 +2374,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /**
          * @throws IgniteCheckedException If failed to close the WAL segment 
file.
          */
-        public void close() throws IgniteCheckedException {
+        @Override public void close() throws IgniteCheckedException {
             try {
                 fileIO.close();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index de45e3d..4904102 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -30,8 +30,11 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.file.Files;
 import java.sql.Time;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -44,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
@@ -88,12 +92,14 @@ import 
org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
@@ -416,6 +422,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         if (!cctx.kernalContext().clientNode()) {
             assert archiver != null;
+
             archiver.start();
 
             if (compressor != null)
@@ -517,9 +524,48 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
     }
 
     /**
-     * @return Latest serializer version.
+     *  Collect wal segment files from low pointer (include) to high pointer 
(not include) and reserve low pointer.
+     *
+     * @param low Low bound.
+     * @param high High bound.
      */
-    public int serializerVersion() {
+    public Collection<File> getAndReserveWalFiles(FileWALPointer low, 
FileWALPointer high) throws IgniteCheckedException {
+        final long awaitIdx = high.index() - 1;
+
+        while (archiver.lastArchivedAbsoluteIndex() < awaitIdx)
+            LockSupport.parkNanos(Thread.currentThread(), 1_000_000);
+
+        if (!reserve(low))
+            throw new IgniteCheckedException("WAL archive segment has been 
deleted [idx=" + low.index() + "]");
+
+        List<File> res = new ArrayList<>();
+
+        for (long i = low.index(); i < high.index(); i++) {
+            String segmentName = 
FileWriteAheadLogManager.FileDescriptor.fileName(i);
+
+            File file = new File(walArchiveDir, segmentName);
+            File fileZip = new File(walArchiveDir, segmentName + ".zip");
+
+            if (file.exists())
+                res.add(file);
+            else if (fileZip.exists())
+                res.add(fileZip);
+            else {
+                if (log.isInfoEnabled()) {
+                    log.info("Segment not found: " + file.getName() + "/" + 
fileZip.getName());
+
+                    log.info("Stopped iteration on idx: " + i);
+                }
+
+                break;
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc}*/
+    @Override public int serializerVersion() {
         return serializerVersion;
     }
 
@@ -572,7 +618,13 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         // Need to calculate record size first.
         record.size(serializer.size(record));
 
-        for (; ; currWrHandle = rollOver(currWrHandle)) {
+        while (true) {
+            if (record.rollOver()){
+                assert cctx.database().checkpointLockIsHeldByThread();
+
+                currWrHandle = rollOver(currWrHandle);
+            }
+
             WALPointer ptr = currWrHandle.addRecord(record);
 
             if (ptr != null) {
@@ -585,6 +637,8 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
                 return ptr;
             }
+            else
+                currWrHandle = rollOver(currWrHandle);
 
             checkNode();
 
@@ -1040,7 +1094,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             createFile(first);
         }
         else
-            checkFiles(0, false, null);
+            checkFiles(0, false, null, null);
     }
 
     /**
@@ -1188,6 +1242,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
          */
         private Map<Long, Integer> locked = new HashMap<>();
 
+        /** Formatted index. */
+        private int formatted;
+
         /**
          *
          */
@@ -1285,8 +1342,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                     while (curAbsWalIdx == -1 && !stopped)
                         wait();
 
-                    if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1)
-                        
changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1);
+                    // If the archive directory is empty, we can be sure that 
there were no WAL segments archived.
+                    // This is ensured by the check in truncate() which will 
leave at least one file there
+                    // once it was archived.
                 }
 
                 while (!Thread.currentThread().isInterrupted() && !stopped) {
@@ -1373,7 +1431,13 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                     // Notify archiver thread.
                     notifyAll();
 
-                    while (curAbsWalIdx - lastAbsArchivedIdx > 
dsCfg.getWalSegments() && cleanException == null)
+                    int segments = dsCfg.getWalSegments();
+
+                    while ((curAbsWalIdx - lastAbsArchivedIdx > segments && 
cleanException == null))
+                        wait();
+
+                    // Wait for formatter so that we do not open an empty file 
in DEFAULT mode.
+                    while (curAbsWalIdx % dsCfg.getWalSegments() > formatted)
                         wait();
 
                     return curAbsWalIdx;
@@ -1502,11 +1566,23 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
          * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()}
          */
         private void allocateRemainingFiles() throws IgniteCheckedException {
-            checkFiles(1, true, new IgnitePredicate<Integer>() {
-                @Override public boolean apply(Integer integer) {
-                    return !checkStop();
-                }
-            });
+            final FileArchiver archiver = this;
+
+            checkFiles(1,
+                true,
+                new IgnitePredicate<Integer>() {
+                    @Override public boolean apply(Integer integer) {
+                        return !checkStop();
+                    }
+                }, new CI1<Integer>() {
+                    @Override public void apply(Integer idx) {
+                        synchronized (archiver) {
+                            formatted = idx;
+
+                            archiver.notifyAll();
+                        }
+                    }
+                });
         }
     }
 
@@ -1821,7 +1897,12 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
      * @param p Predicate Exit condition.
      * @throws IgniteCheckedException if validation or create file fail.
      */
-    private void checkFiles(int startWith, boolean create, 
IgnitePredicate<Integer> p) throws IgniteCheckedException {
+    private void checkFiles(
+        int startWith,
+        boolean create,
+        @Nullable IgnitePredicate<Integer> p,
+        @Nullable IgniteInClosure<Integer> completionCallback
+    ) throws IgniteCheckedException {
         for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p 
!= null && p.apply(i))); i++) {
             File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
 
@@ -1835,6 +1916,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             }
             else if (create)
                 createFile(checkFile);
+
+            if (completionCallback != null)
+                completionCallback.apply(i);
         }
     }
 
@@ -2133,7 +2217,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         /**
          * @throws IgniteCheckedException If failed to close the WAL segment 
file.
          */
-        public void close() throws IgniteCheckedException {
+        @Override public void close() throws IgniteCheckedException {
             try {
                 fileIO.close();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/932692ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 6a35c8a..9b2a206 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -42,6 +42,11 @@ public class NoOpWALManager implements 
IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
+    @Override public int serializerVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public void resumeLogging(WALPointer ptr) throws 
IgniteCheckedException {
         // No-op.
     }

Reply via email to