This is an automated email from the ASF dual-hosted git repository. dgovorukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6670daf IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart 6670daf is described below commit 6670daf79ae41be2adcf75765dd90fcbfa675e30 Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> AuthorDate: Tue Apr 16 18:01:42 2019 +0300 IGNITE-11641 Fixed server node copies a lot of WAL files in WAL archive after restart --- .../persistence/wal/FileWriteAheadLogManager.java | 167 +++++++++++++------ .../db/IgnitePdsStartWIthEmptyArchive.java | 178 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite4.java | 2 + 3 files changed, 301 insertions(+), 46 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index c6cdf7c..cf83e3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInput; import java.io.EOFException; import java.io.File; import java.io.FileFilter; @@ -29,6 +30,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.sql.Time; @@ -41,7 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; @@ -126,6 +128,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE; @@ -137,8 +140,10 @@ import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; /** @@ -445,22 +450,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } }); - IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); - segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled()); - segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1); - - long lastAbsArchivedIdx = tup == null ? -1 : tup.get2(); - if (isArchiverEnabled()) - archiver = new FileArchiver(lastAbsArchivedIdx, log); + archiver = new FileArchiver(segmentAware, log); else archiver = null; - if (lastAbsArchivedIdx > 0) - segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); - if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(log); @@ -1098,41 +1094,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Lists files in archive directory and returns the indices of least and last archived files. - * In case of holes, first segment after last "hole" is considered as minimum. - * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20 - * - * @return The absolute indices of min and max archived files. + * @param file File to read. + * @param ioFactory IO factory. */ - private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() { - TreeSet<Long> archiveIndices = new TreeSet<>(); - - for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { - try { - long idx = Long.parseLong(file.getName().substring(0, 16)); + private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { + FileDescriptor ds = new FileDescriptor(file); - archiveIndices.add(idx); - } - catch (NumberFormatException | IndexOutOfBoundsException ignore) { - // No-op. - } - } + try ( + SegmentIO fileIO = ds.toIO(ioFactory); + ByteBufferExpander buf = new ByteBufferExpander(HEADER_RECORD_SIZE, ByteOrder.nativeOrder()) + ) { + final DataInput in = segmentFileInputFactory.createFileInput(fileIO, buf); - if (archiveIndices.isEmpty()) - return null; - else { - Long min = archiveIndices.first(); - Long max = archiveIndices.last(); + // Header record must be agnostic to the serializer version. + final int type = in.readUnsignedByte(); - if (max - min == archiveIndices.size() - 1) - return F.t(min, max); // Short path. + if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) { + if (log.isInfoEnabled()) + log.info("Reached logical end of the segment for file " + file); - for (Long idx : archiveIndices.descendingSet()) { - if (!archiveIndices.contains(idx - 1)) - return F.t(idx, max); + return null; } - throw new IllegalStateException("Should never happen if TreeSet is valid."); + FileWALPointer ptr = readPosition(in); + + return new FileDescriptor(file, ptr.index()); + } + catch (IOException e) { + U.warn(log, "Failed to read file header [" + file + "]. Skipping this file", e); + + return null; } } @@ -1476,7 +1467,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (log.isDebugEnabled()) log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); - File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX); + File tmp = new File(file.getParent(), file.getName() + TMP_SUFFIX); formatFile(tmp); @@ -1618,11 +1609,95 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * */ - private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) { + private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException { super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log, cctx.kernalContext().workersRegistry()); - segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); + init(segmentAware); + } + + /** + * @param segmentAware Segment aware. + * @throws IgniteCheckedException If initialization failed. + */ + private void init(SegmentAware segmentAware) throws IgniteCheckedException { + IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); + + segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1); + + long lastAbsArchivedIdx = tup == null ? -1 : tup.get2(); + + if (lastAbsArchivedIdx >= 0) + segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); + } + + /** + * Lists files in archive directory and returns the indices of least and last archived files. + * In case of holes, first segment after last "hole" is considered as minimum. + * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20 + * + * @return The absolute indices of min and max archived files. + */ + private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() throws IgniteCheckedException { + TreeMap<Long, FileDescriptor> archiveIndices = new TreeMap<>(); + + for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { + try { + long idx = Long.parseLong(file.getName().substring(0, 16)); + + FileDescriptor desc = readFileDescriptor(file, ioFactory); + + if (desc != null) { + if (desc.idx() == idx) + archiveIndices.put(desc.idx(), desc); + } + else + log.warning("Skip file, failed read file header " + file); + } + catch (NumberFormatException | IndexOutOfBoundsException ignore) { + log.warning("Skip file " + file); + } + } + + if (!archiveIndices.isEmpty()) { + Long min = archiveIndices.navigableKeySet().first(); + Long max = archiveIndices.navigableKeySet().last(); + + if (max - min == archiveIndices.size() - 1) + return F.t(min, max); // Short path. + + // Try to find min and max if we have skipped range semgnets in archive. Find firs gap. + for (Long idx : archiveIndices.descendingKeySet()) { + if (!archiveIndices.keySet().contains(idx - 1)) + return F.t(idx, max); + } + + throw new IllegalStateException("Should never happen if archiveIndices TreeMap is valid."); + } + + // If WAL archive is empty, try to find last not archived segment in work directory and copy to WAL archive. + TreeMap<Long, FileDescriptor> workIndices = new TreeMap<>(); + + for (File file : walWorkDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { + FileDescriptor desc = readFileDescriptor(file, ioFactory); + + if (desc != null) + workIndices.put(desc.idx(), desc); + } + + if (!workIndices.isEmpty()) { + FileDescriptor first = workIndices.firstEntry().getValue(); + FileDescriptor last = workIndices.lastEntry().getValue(); + + if (first.idx() != last.idx()) { + archiveSegment(first.idx()); + + // Use copied segment as min archived segment. + return F.t(first.idx(), first.idx()); + } + } + + return null; } /** @@ -1799,14 +1874,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param absIdx Absolute index to archive. */ - private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException { + public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException { long segIdx = absIdx % dsCfg.getWalSegments(); File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); String name = FileDescriptor.fileName(absIdx); - File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX); + File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX); File dstFile = new File(walArchiveDir, name); @@ -2037,7 +2112,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl deleteObsoleteRawSegments(); File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) - + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX); + + FilePageStoreManager.ZIP_SUFFIX + TMP_SUFFIX); File zip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX); @@ -2225,7 +2300,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + FilePageStoreManager.ZIP_SUFFIX); File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) - + FilePageStoreManager.TMP_SUFFIX); + + TMP_SUFFIX); File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java new file mode 100644 index 0000000..5104e65 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsStartWIthEmptyArchive.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.io.File; +import java.util.Arrays; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.WalSegmentArchivedEvent; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle; +import org.apache.ignite.internal.util.future.CountDownFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER; + +/** + * + */ +public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setCacheConfiguration( + new CacheConfiguration(DEFAULT_CACHE_NAME) + ); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + // Checkpoint should not remove any WAL archive files. + .setMaxWalArchiveSize(Long.MAX_VALUE) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + return cfg; + } + + @Before + public void before() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + @Test + public void test() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().active(true); + + FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal(); + + // Populate data for generate WAL archive segments. + try (IgniteDataStreamer<Integer, byte[]> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) { + int entries = 1000; + + for (int i = 0; i < entries; i++) { + st.addData(i, new byte[1024 * 1024]); + } + } + + File archiveDir = U.field(walMgr, "walArchiveDir"); + + stopGrid(0, false); + + SegmentAware beforeSaw = U.field(walMgr, "segmentAware"); + + long beforeLastArchivedAbsoluteIndex = beforeSaw.lastArchivedAbsoluteIndex(); + + FileWriteHandle fhBefore = U.field(walMgr, "currHnd"); + + long idxBefore = fhBefore.getSegmentId(); + + File[] files = archiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER); + + Arrays.sort(files); + + // Cleanup archive directory. + for (File f : files) { + if (f.delete()) + log.info("File " + f.getAbsolutePath() + " deleted"); + } + + Assert.assertEquals(0, archiveDir.listFiles().length); + + // Restart grid again after archive was removed. + ig = startGrid(0); + + walMgr = (FileWriteAheadLogManager)ig.context().cache().context().wal(); + + SegmentAware afterSaw = U.field(walMgr, "segmentAware"); + + long afterLastArchivedAbsoluteIndex = afterSaw.lastArchivedAbsoluteIndex(); + + int segments = ig.configuration().getDataStorageConfiguration().getWalSegments(); + + Assert.assertTrue( + "lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex + + ", lastArchivedAfterIdx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments, + afterLastArchivedAbsoluteIndex >= + (beforeLastArchivedAbsoluteIndex - segments)); + + ig.cluster().active(true); + + FileWriteHandle fhAfter = U.field(walMgr, "currHnd"); + + Assert.assertNotNull(fhAfter); + + long idxAfter = fhAfter.getSegmentId(); + + Assert.assertEquals(idxBefore, idxAfter); + Assert.assertTrue(idxAfter >= beforeLastArchivedAbsoluteIndex); + + // Future for await all current available semgment will be archived. + CountDownFuture awaitAchviedSegmentsLatch = new CountDownFuture( + // One is a last archived, secod is a current write segment. + (int)(idxAfter - afterLastArchivedAbsoluteIndex - 2) + ); + + log.info("currentIdx=" + idxAfter + ", lastArchivedBeforeIdx=" + beforeLastArchivedAbsoluteIndex + + ", lastArchivedAfteridx=" + afterLastArchivedAbsoluteIndex + ", segments=" + segments); + + ig.events().localListen(e -> { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + + log.info("EVT_WAL_SEGMENT_ARCHIVED:" + archComplEvt.getAbsWalSegmentIdx()); + + if (archComplEvt.getAbsWalSegmentIdx() > afterLastArchivedAbsoluteIndex){ + awaitAchviedSegmentsLatch.onDone(); + + return true; + } + + if (archComplEvt.getAbsWalSegmentIdx() < afterLastArchivedAbsoluteIndex){ + awaitAchviedSegmentsLatch.onDone(new IgniteException("Unexected segment for archivation. idx=" + + archComplEvt.getAbsWalSegmentIdx())); + + return false; + } + + return true; + }, EVT_WAL_SEGMENT_ARCHIVED); + + awaitAchviedSegmentsLatch.get(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java index 5e48ee4..72967c0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCanc import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheWalDisabledOnRebalancingTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest; import org.apache.ignite.internal.processors.cache.persistence.file.FileDownloaderTest; import org.apache.ignite.testframework.GridTestUtils; @@ -63,6 +64,7 @@ public class IgnitePdsTestSuite4 { GridTestUtils.addTestIfNeeded(suite, IgniteRebalanceOnCachesStoppingOrDestroyingTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CachePageWriteLockUnlockTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests); return suite; }