Repository: ignite
Updated Branches:
  refs/heads/master 9862dbe03 -> ef4a02dc5


IGNITE-9438 Fix file descriptors leak in StandaloneWalRecordsIterator. - Fixes 
#4658.

Signed-off-by: Dmitriy Govorukhin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ef4a02dc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ef4a02dc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ef4a02dc

Branch: refs/heads/master
Commit: ef4a02dc5d3f66258afa9a3ded8d7671adf6ee73
Parents: 9862dbe
Author: Sergey Antonov <[email protected]>
Authored: Wed Sep 5 11:33:07 2018 +0300
Committer: Dmitriy Govorukhin <[email protected]>
Committed: Wed Sep 5 11:33:07 2018 +0300

----------------------------------------------------------------------
 .../wal/AbstractWalRecordsIterator.java         | 139 ++++++++----
 .../reader/StandaloneWalRecordsIterator.java    |  33 +--
 .../ignite/internal/util/IgniteUtils.java       |  16 ++
 .../StandaloneWalRecordsIteratorTest.java       | 216 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   8 +-
 5 files changed, 349 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ef4a02dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 9fbb535..0b704ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -36,6 +36,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.P2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -43,8 +44,8 @@ import org.jetbrains.annotations.Nullable;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 
 /**
- * Iterator over WAL segments. This abstract class provides most functionality 
for reading records in log.
- * Subclasses are to override segment switching functionality
+ * Iterator over WAL segments. This abstract class provides most functionality 
for reading records in log. Subclasses
+ * are to override segment switching functionality
  */
 public abstract class AbstractWalRecordsIterator
     extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> 
implements WALIterator {
@@ -52,14 +53,14 @@ public abstract class AbstractWalRecordsIterator
     private static final long serialVersionUID = 0L;
 
     /**
-     * Current record preloaded, to be returned on next()<br>
-     * Normally this should be not null because advance() method should 
already prepare some value<br>
+     * Current record preloaded, to be returned on next()<br> Normally this 
should be not null because advance() method
+     * should already prepare some value<br>
      */
     protected IgniteBiTuple<WALPointer, WALRecord> curRec;
 
     /**
-     * Current WAL segment absolute index. <br>
-     * Determined as lowest number of file at start, is changed during advance 
segment
+     * Current WAL segment absolute index. <br> Determined as lowest number of 
file at start, is changed during advance
+     * segment
      */
     protected long curWalSegmIdx = -1;
 
@@ -179,7 +180,6 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
-     *
      * @param tailReachedException Tail reached exception.
      * @param currWalSegment Current WAL segment read handler.
      * @return If need to throw exception after validation.
@@ -210,9 +210,8 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
-     * Switches records iterator to the next WAL segment
-     * as result of this method, new reference to segment should be returned.
-     * Null for current handle means stop of iteration.
+     * Switches records iterator to the next WAL segment as result of this 
method, new reference to segment should be
+     * returned. Null for current handle means stop of iteration.
      *
      * @param curWalSegment current open WAL segment or null if there is no 
open segment yet
      * @return new WAL segment to read or null for stop iteration
@@ -263,8 +262,8 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
-     * Performs final conversions with record loaded from WAL.
-     * To be overridden by subclasses if any processing required.
+     * Performs final conversions with record loaded from WAL. To be 
overridden by subclasses if any processing
+     * required.
      *
      * @param rec record to post process.
      * @return post processed record.
@@ -278,11 +277,11 @@ public abstract class AbstractWalRecordsIterator
      *
      * @param e problem from records reading
      * @param ptr file pointer was accessed
-     *
-     * @return {@code null} if the error was handled and we can go ahead,
-     *  {@code IgniteCheckedException} if the error was not handled, and we 
should stop the iteration.
+     * @return {@code null} if the error was handled and we can go ahead, 
{@code IgniteCheckedException} if the error
+     * was not handled, and we should stop the iteration.
      */
-    protected IgniteCheckedException handleRecordException(@NotNull final 
Exception e, @Nullable final FileWALPointer ptr) {
+    protected IgniteCheckedException handleRecordException(@NotNull final 
Exception e,
+        @Nullable final FileWALPointer ptr) {
         if (log.isInfoEnabled())
             log.info("Stopping WAL iteration due to an exception: " + 
e.getMessage() + ", ptr=" + ptr);
 
@@ -290,45 +289,92 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
+     * Assumes fileIO will be closed in this method in case of error occurred.
+     *
      * @param desc File descriptor.
-     * @param start Optional start pointer. Null means read from the beginning
-     * @return Initialized file handle.
-     * @throws FileNotFoundException If segment file is missing.
+     * @param start Optional start pointer. Null means read from the beginning.
+     * @param fileIO fileIO associated with file descriptor
+     * @param segmentHeader read segment header from fileIO
+     * @return Initialized file read header.
      * @throws IgniteCheckedException If initialized failed due to another 
unexpected error.
      */
     protected AbstractReadFileHandle initReadHandle(
         @NotNull final AbstractFileDescriptor desc,
-        @Nullable final FileWALPointer start
-    ) throws IgniteCheckedException, FileNotFoundException {
+        @Nullable final FileWALPointer start,
+        @NotNull final FileIO fileIO,
+        @NotNull final SegmentHeader segmentHeader
+    ) throws IgniteCheckedException {
         try {
-            FileIO fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) 
: ioFactory.create(desc.file());
+            final boolean isCompacted = segmentHeader.isCompacted();
+
+            if (isCompacted)
+                serializerFactory.skipPositionCheck(true);
+
+            FileInput in = new FileInput(fileIO, buf);
+
+            if (start != null && desc.idx() == start.index()) {
+                if (isCompacted) {
+                    if (start.fileOffset() != 0)
+                        serializerFactory.recordDeserializeFilter(new 
StartSeekingFilter(start));
+                }
+                else {
+                    // Make sure we skip header with serializer version.
+                    long startOff = Math.max(start.fileOffset(), 
fileIO.position());
 
+                    in.seek(startOff);
+                }
+            }
+
+            int serVer = segmentHeader.getSerializerVersion();
+
+            return createReadFileHandle(fileIO, desc.idx(), 
serializerFactory.createSerializer(serVer), in);
+        }
+        catch (SegmentEofException | EOFException ignore) {
             try {
-                SegmentHeader segmentHeader = readSegmentHeader(fileIO, 
curWalSegmIdx);
+                fileIO.close();
+            }
+            catch (IOException ce) {
+                throw new IgniteCheckedException(ce);
+            }
 
-                boolean isCompacted = segmentHeader.isCompacted();
+            return null;
+        }
+        catch (IgniteCheckedException e) {
+            U.closeWithSuppressingException(fileIO, e);
 
-                if (isCompacted)
-                    serializerFactory.skipPositionCheck(true);
+            throw e;
+        }
+        catch (IOException e) {
+            U.closeWithSuppressingException(fileIO, e);
 
-                FileInput in = new FileInput(fileIO, buf);
+            throw new IgniteCheckedException(
+                "Failed to initialize WAL segment after reading segment 
header: " + desc.file().getAbsolutePath(), e);
+        }
+    }
 
-                if (start != null && desc.idx() == start.index()) {
-                    if (isCompacted) {
-                        if (start.fileOffset() != 0)
-                            serializerFactory.recordDeserializeFilter(new 
StartSeekingFilter(start));
-                    }
-                    else {
-                        // Make sure we skip header with serializer version.
-                        long startOff = Math.max(start.fileOffset(), 
fileIO.position());
+    /**
+     * Assumes file descriptor will be opened in this method. The caller of 
this method must be responsible for closing
+     * opened file descriptor File descriptor will be closed ONLY in case of 
error occurred.
+     *
+     * @param desc File descriptor.
+     * @param start Optional start pointer. Null means read from the beginning
+     * @return Initialized file read header.
+     * @throws FileNotFoundException If segment file is missing.
+     * @throws IgniteCheckedException If initialized failed due to another 
unexpected error.
+     */
+    protected AbstractReadFileHandle initReadHandle(
+        @NotNull final AbstractFileDescriptor desc,
+        @Nullable final FileWALPointer start
+    ) throws IgniteCheckedException, FileNotFoundException {
+        FileIO fileIO = null;
 
-                        in.seek(startOff);
-                    }
-                }
+        try {
+            fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) : 
ioFactory.create(desc.file());
 
-                int serVer = segmentHeader.getSerializerVersion();
+            SegmentHeader segmentHeader;
 
-                return createReadFileHandle(fileIO, desc.idx(), 
serializerFactory.createSerializer(serVer), in);
+            try {
+                segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx);
             }
             catch (SegmentEofException | EOFException ignore) {
                 try {
@@ -341,20 +387,21 @@ public abstract class AbstractWalRecordsIterator
                 return null;
             }
             catch (IOException | IgniteCheckedException e) {
-                try {
-                    fileIO.close();
-                }
-                catch (IOException ce) {
-                    e.addSuppressed(ce);
-                }
+                U.closeWithSuppressingException(fileIO, e);
 
                 throw e;
             }
+
+            return initReadHandle(desc, start, fileIO, segmentHeader);
         }
         catch (FileNotFoundException e) {
+            U.closeQuiet(fileIO);
+
             throw e;
         }
         catch (IOException e) {
+            U.closeQuiet(fileIO);
+
             throw new IgniteCheckedException(
                 "Failed to initialize WAL segment: " + 
desc.file().getAbsolutePath(), e);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ef4a02dc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index 6729790..a222877 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -48,8 +48,10 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
@@ -58,8 +60,8 @@ import org.jetbrains.annotations.Nullable;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 
 /**
- * WAL reader iterator, for creation in standalone WAL reader tool
- * Operates over one directory, does not provide start and end boundaries
+ * WAL reader iterator, for creation in standalone WAL reader tool Operates 
over one directory, does not provide start
+ * and end boundaries
  */
 class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
     /** Record buffer size */
@@ -88,12 +90,13 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
     /**
      * Creates iterator in file-by-file iteration mode. Directory
+     *
      * @param log Logger.
      * @param sharedCtx Shared context. Cache processor is to be configured if 
Cache Object Key & Data Entry is
      * required.
      * @param ioFactory File I/O factory.
-     * @param keepBinary Keep binary. This flag disables converting of non 
primitive types
-     * (BinaryObjects will be used instead)
+     * @param keepBinary Keep binary. This flag disables converting of non 
primitive types (BinaryObjects will be used
+     * instead)
      * @param walFiles Wal files.
      */
     StandaloneWalRecordsIterator(
@@ -128,8 +131,8 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     }
 
     /**
-     * For directory mode sets oldest file as initial segment,
-     * for file by file mode, converts all files to descriptors and gets 
oldest as initial.
+     * For directory mode sets oldest file as initial segment, for file by 
file mode, converts all files to descriptors
+     * and gets oldest as initial.
      *
      * @param walFiles files for file-by-file iteration mode
      */
@@ -232,7 +235,6 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     }
 
     /**
-     *
      * @param ptr WAL pointer.
      * @return {@code True} If pointer between low and high bounds. {@code 
False} if not.
      */
@@ -243,7 +245,6 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     }
 
     /**
-     *
      * @param idx WAL segment index.
      * @return {@code True} If pointer between low and high bounds. {@code 
False} if not.
      */
@@ -258,18 +259,21 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
     ) throws IgniteCheckedException, FileNotFoundException {
 
         AbstractFileDescriptor fd = desc;
-
+        FileIO fileIO = null;
+        SegmentHeader segmentHeader;
         while (true) {
             try {
-                FileIO fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) 
: ioFactory.create(fd.file());
+                fileIO = fd.isCompressed() ? new UnzipFileIO(fd.file()) : 
ioFactory.create(fd.file());
 
-                readSegmentHeader(fileIO, curWalSegmIdx);
+                segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx);
 
                 break;
             }
             catch (IOException | IgniteCheckedException e) {
                 log.error("Failed to init segment curWalSegmIdx=" + 
curWalSegmIdx + ", curIdx=" + curIdx, e);
 
+                U.closeQuiet(fileIO);
+
                 curIdx++;
 
                 if (curIdx >= walFileDescriptors.size())
@@ -279,13 +283,13 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
             }
         }
 
-        return super.initReadHandle(fd, start);
+        return initReadHandle(fd, start, fileIO, segmentHeader);
     }
 
     /** {@inheritDoc} */
     @NotNull @Override protected WALRecord postProcessRecord(@NotNull final 
WALRecord rec) {
-         GridKernalContext kernalCtx = sharedCtx.kernalContext();
-         IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
+        GridKernalContext kernalCtx = sharedCtx.kernalContext();
+        IgniteCacheObjectProcessor processor = kernalCtx.cacheObjects();
 
         if (processor != null && rec.type() == RecordType.DATA_RECORD) {
             try {
@@ -335,6 +339,7 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
 
     /**
      * Converts entry or lazy data entry into unwrapped entry
+     *
      * @param processor cache object processor for de-serializing objects.
      * @param fakeCacheObjCtx cache object context for de-serializing binary 
and unwrapping objects.
      * @param dataEntry entry to process

http://git-wip-us.apache.org/repos/asf/ignite/blob/ef4a02dc/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index a68fb6d..d2d2b89 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -4020,6 +4020,22 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * Closes given resource suppressing possible checked exception.
+     *
+     * @param rsrc Resource to close. If it's {@code null} - it's no-op.
+     * @param e Suppressor exception
+     */
+    public static void closeWithSuppressingException(@Nullable AutoCloseable 
rsrc, @NotNull Exception e) {
+        if (rsrc != null)
+            try {
+                rsrc.close();
+            }
+            catch (Exception suppressed) {
+               e.addSuppressed(suppressed);
+            }
+    }
+
+    /**
      * Quietly closes given resource ignoring possible checked exception.
      *
      * @param rsrc Resource to close. If it's {@code null} - it's no-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ef4a02dc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
new file mode 100644
index 0000000..b6a04d0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIteratorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+
+/**
+ * The test check, that StandaloneWalRecordsIterator correctly close file 
descriptors associated with WAL files.
+ */
+public class StandaloneWalRecordsIteratorTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration().
+                setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        ).setDiscoverySpi(
+            new TcpDiscoverySpi()
+                .setIpFinder(IP_FINDER)
+        );
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Check correct closing file descriptors.
+     *
+     * @throws Exception if test failed.
+     */
+    public void testCorrectClosingFileDescriptors() throws Exception {
+        IgniteEx ig = (IgniteEx)startGrid();
+
+        String archiveWalDir = getArchiveWalDirPath(ig);
+
+        ig.cluster().active(true);
+
+        IgniteCacheDatabaseSharedManager sharedMgr = 
ig.context().cache().context().database();
+
+        IgniteWriteAheadLogManager walMgr = 
ig.context().cache().context().wal();
+
+        // Generate WAL segments for filling WAL archive folder.
+        for (int i = 0; i < 2 * 
ig.configuration().getDataStorageConfiguration().getWalSegments(); i++) {
+            sharedMgr.checkpointReadLock();
+
+            try {
+                walMgr.log(new SnapshotRecord(i, false));
+            }
+            finally {
+                sharedMgr.checkpointReadUnlock();
+            }
+        }
+
+        stopGrid();
+
+        // Iterate by all archived WAL segments.
+        createWalIterator(archiveWalDir).forEach(x -> {
+        });
+
+        assertTrue("At least one WAL file must be opened!", 
CountedFileIO.getCountOpenedWalFiles() > 0);
+
+        assertEquals("All WAL files must be closed!", 
CountedFileIO.getCountOpenedWalFiles(), CountedFileIO.getCountClosedWalFiles());
+    }
+
+    /**
+     * Creates WALIterator associated with files inside walDir.
+     *
+     * @param walDir - path to WAL directory.
+     * @return WALIterator associated with files inside walDir.
+     * @throws IgniteCheckedException if error occur.
+     */
+    private WALIterator createWalIterator(String walDir) throws 
IgniteCheckedException {
+        IteratorParametersBuilder params = new IteratorParametersBuilder();
+
+        params.ioFactory(new CountedFileIOFactory());
+
+        return new 
IgniteWalIteratorFactory(log).iterator(params.filesOrDirs(walDir));
+    }
+
+    /**
+     * Evaluate path to directory with WAL archive.
+     *
+     * @param ignite instance of Ignite.
+     * @return path to directory with WAL archive.
+     * @throws IgniteCheckedException if error occur.
+     */
+    private String getArchiveWalDirPath(Ignite ignite) throws 
IgniteCheckedException {
+        return U.resolveWorkDirectory(
+            U.defaultWorkDirectory(),
+            
ignite.configuration().getDataStorageConfiguration().getWalArchivePath(),
+            false
+        ).getAbsolutePath();
+    }
+
+    /**
+     *
+     */
+    private static class CountedFileIOFactory extends 
RandomAccessFileIOFactory {
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, StandardOpenOption.CREATE, 
StandardOpenOption.READ, StandardOpenOption.WRITE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            return new CountedFileIO(file, modes);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CountedFileIO extends RandomAccessFileIO {
+        /** Wal open counter. */
+        private static final AtomicInteger WAL_OPEN_COUNTER = new 
AtomicInteger();
+        /** Wal close counter. */
+        private static final AtomicInteger WAL_CLOSE_COUNTER = new 
AtomicInteger();
+
+        /** File name. */
+        private final String fileName;
+
+        /** */
+        public CountedFileIO(File file, OpenOption... modes) throws 
IOException {
+            super(file, modes);
+
+            fileName = file.getName();
+
+            if 
(FileWriteAheadLogManager.WAL_NAME_PATTERN.matcher(fileName).matches())
+                WAL_OPEN_COUNTER.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            super.close();
+
+            if 
(FileWriteAheadLogManager.WAL_NAME_PATTERN.matcher(fileName).matches())
+                WAL_CLOSE_COUNTER.incrementAndGet();
+        }
+
+        /**
+         *
+         * @return number of opened files.
+         */
+        public static int getCountOpenedWalFiles() { return 
WAL_OPEN_COUNTER.get(); }
+
+        /**
+         *
+         * @return number of closed files.
+         */
+        public static int getCountClosedWalFiles() { return 
WAL_CLOSE_COUNTER.get(); }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ef4a02dc/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 5eba5a4..956d256 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -34,7 +34,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOf
 import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
-import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
@@ -53,13 +52,13 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
-import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoverySeveralRestartsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveLogOnlyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIteratorTest;
 
 /**
  *
@@ -89,7 +88,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
     }
 
     /**
-     * Fills {@code suite} with PDS test subset, which operates with real page 
store, but requires long time to execute.
+     * Fills {@code suite} with PDS test subset, which operates with real page 
store, but requires long time to
+     * execute.
      *
      * @param suite suite to add tests into.
      */
@@ -171,6 +171,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgniteNodeStoppedDuringDisableWALTest.class);
 
+        suite.addTestSuite(StandaloneWalRecordsIteratorTest.class);
+
         //suite.addTestSuite(IgniteWalRecoverySeveralRestartsTest.class);
 
         suite.addTestSuite(IgniteRebalanceScheduleResendPartitionsTest.class);

Reply via email to