ignite-5938 WAL logs compaction and compression after checkpoint Signed-off-by: agura <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f50b2354 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f50b2354 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f50b2354 Branch: refs/heads/ignite-zk Commit: f50b2354d6b87084f0c29376915821296f3fd480 Parents: 010d02b Author: Ivan Rakov <[email protected]> Authored: Fri Nov 24 18:59:16 2017 +0300 Committer: agura <[email protected]> Committed: Fri Nov 24 18:59:52 2017 +0300 ---------------------------------------------------------------------- ...tingToWalV2SerializerWithCompactionTest.java | 250 +++++++ .../IgniteCompatibilityBasicTestSuite.java | 3 + .../apache/ignite/IgniteSystemProperties.java | 5 + .../configuration/DataStorageConfiguration.java | 27 + .../pagemem/wal/IgniteWriteAheadLogManager.java | 8 + .../pagemem/wal/record/FilteredRecord.java | 31 + .../pagemem/wal/record/MarshalledRecord.java | 61 ++ .../GridCacheDatabaseSharedManager.java | 5 + .../wal/AbstractWalRecordsIterator.java | 73 ++- .../cache/persistence/wal/FileInput.java | 8 +- .../wal/FileWriteAheadLogManager.java | 650 +++++++++++++++---- .../persistence/wal/RecordDataSerializer.java | 58 -- .../cache/persistence/wal/RecordSerializer.java | 62 -- .../SingleSegmentLogicalRecordsIterator.java | 141 ++++ .../reader/StandaloneWalRecordsIterator.java | 5 +- .../persistence/wal/record/HeaderRecord.java | 7 +- .../persistence/wal/record/RecordTypes.java | 69 ++ .../wal/serializer/RecordDataSerializer.java | 59 ++ .../wal/serializer/RecordDataV1Serializer.java | 11 +- .../wal/serializer/RecordDataV2Serializer.java | 19 +- .../wal/serializer/RecordSerializer.java | 63 ++ .../wal/serializer/RecordSerializerFactory.java | 71 ++ .../serializer/RecordSerializerFactoryImpl.java | 133 ++++ .../wal/serializer/RecordV1Serializer.java | 71 +- .../wal/serializer/RecordV2Serializer.java | 93 ++- .../utils/PlatformConfigurationUtils.java | 2 + .../db/wal/IgniteWalFlushFailoverTest.java | 16 +- .../wal/IgniteWalHistoryReservationsTest.java | 2 +- .../db/wal/IgniteWalRecoveryTest.java | 2 +- .../IgniteWalRecoveryWithCompactionTest.java | 33 + .../db/wal/IgniteWalSerializerVersionTest.java | 9 +- .../persistence/db/wal/WalCompactionTest.java | 312 +++++++++ .../persistence/pagemem/NoOpWALManager.java | 5 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + .../IgnitePdsWithIndexingCoreTestSuite.java | 2 + .../Configuration/DataStorageConfiguration.cs | 16 + .../IgniteConfigurationSection.xsd | 5 + 37 files changed, 2102 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java ---------------------------------------------------------------------- diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java new file mode 100644 index 0000000..0ca3833 --- /dev/null +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/MigratingToWalV2SerializerWithCompactionTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compatibility.persistence; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; + +/** + * Saves data using previous version of ignite and then load this data using actual version + */ +public class MigratingToWalV2SerializerWithCompactionTest extends IgnitePersistenceCompatibilityAbstractTest { + /** */ + private static final String TEST_CACHE_NAME = DummyPersistenceCompatibilityTest.class.getSimpleName(); + + /** Entries count. */ + private static final int ENTRIES = 300; + + /** Wal segment size. */ + private static final int WAL_SEGMENT_SIZE = 1024 * 1024; + + /** Entry payload size. */ + private static final int PAYLOAD_SIZE = 20000; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", false)); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(false); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true)) + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalCompactionEnabled(true) + .setWalMode(WALMode.LOG_ONLY) + .setWalHistorySize(200); + + cfg.setDataStorageConfiguration(memCfg); + + return cfg; + } + + /** + * Tests opportunity to read data from previous Ignite DB version. + * + * @throws Exception If failed. + */ + public void testCompactingOldWalFiles() throws Exception { + doTestStartupWithOldVersion("2.3.0"); + } + + /** + * Tests opportunity to read data from previous Ignite DB version. + * + * @param ver 3-digits version of ignite + * @throws Exception If failed. + */ + private void doTestStartupWithOldVersion(String ver) throws Exception { + try { + startGrid(1, ver, new ConfigurationClosure(), new PostStartupClosure()); + + stopAllGrids(); + + IgniteEx ignite = startGrid(0); + + ignite.active(true); + + IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(TEST_CACHE_NAME); + + for (int i = ENTRIES; i < ENTRIES * 2; i++) { + final byte[] val = new byte[PAYLOAD_SIZE]; + + ThreadLocalRandom.current().nextBytes(val); + + val[i] = 1; + + cache.put(i, val); + } + + // WAL archive segment is allowed to be compressed when it's at least one checkpoint away from current WAL head. + ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + ignite.context().cache().context().database().wakeupForCheckpoint("Forced checkpoint").get(); + + Thread.sleep(15_000); // Time to compress WAL. + + int expCompressedWalSegments = PAYLOAD_SIZE * ENTRIES * 4 / WAL_SEGMENT_SIZE - 1; + + String nodeFolderName = ignite.context().pdsFolderResolver().resolveFolders().folderName(); + + File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); + File walDir = new File(dbDir, "wal"); + File archiveDir = new File(walDir, "archive"); + File nodeArchiveDir = new File(archiveDir, nodeFolderName); + + File[] compressedSegments = nodeArchiveDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".wal.zip"); + } + }); + + final int actualCompressedWalSegments = compressedSegments == null ? 0 : compressedSegments.length; + + assertTrue("expected=" + expCompressedWalSegments + ", actual=" + actualCompressedWalSegments, + actualCompressedWalSegments >= expCompressedWalSegments); + + stopAllGrids(); + + File nodeLfsDir = new File(dbDir, nodeFolderName); + File cpMarkersDir = new File(nodeLfsDir, "cp"); + + File[] cpMarkers = cpMarkersDir.listFiles(); + + assertNotNull(cpMarkers); + assertTrue(cpMarkers.length > 0); + + File cacheDir = new File(nodeLfsDir, "cache-" + TEST_CACHE_NAME); + File[] partFiles = cacheDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("part"); + } + }); + + assertNotNull(partFiles); + assertTrue(partFiles.length > 0); + + // Enforce reading WAL from the very beginning at the next start. + for (File f : cpMarkers) + f.delete(); + + for (File f : partFiles) + f.delete(); + + ignite = startGrid(0); + + ignite.active(true); + + cache = ignite.cache(TEST_CACHE_NAME); + + boolean fail = false; + + // Check that all data is recovered from compacted WAL. + for (int i = 0; i < ENTRIES * 2; i++) { + byte[] arr = cache.get(i); + + if (arr == null) { + System.out.println(">>> Missing: " + i); + + fail = true; + } + else if (arr[i] != 1) { + System.out.println(">>> Corrupted: " + i); + + fail = true; + } + } + + assertFalse(fail); + } + finally { + stopAllGrids(); + } + } + + /** */ + private static class PostStartupClosure implements IgniteInClosure<Ignite> { + /** {@inheritDoc} */ + @Override public void apply(Ignite ignite) { + ignite.active(true); + + CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>(); + cacheCfg.setName(TEST_CACHE_NAME); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setBackups(0); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache(cacheCfg); + + for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total. + final byte[] val = new byte[20000]; + + ThreadLocalRandom.current().nextBytes(val); + + val[i] = 1; + + cache.put(i, val); + } + } + } + + /** */ + private static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public void apply(IgniteConfiguration cfg) { + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setPeerClassLoadingEnabled(false); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setPersistenceEnabled(true)) + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalMode(WALMode.LOG_ONLY) + .setWalHistorySize(100); + + cfg.setDataStorageConfiguration(memCfg); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java index 351a0e7..20643d4 100644 --- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.compatibility.testsuites; import junit.framework.TestSuite; import org.apache.ignite.compatibility.persistence.DummyPersistenceCompatibilityTest; import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest; +import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest; /** * Compatibility tests basic test suite. @@ -36,6 +37,8 @@ public class IgniteCompatibilityBasicTestSuite { suite.addTestSuite(FoldersReuseCompatibilityTest.class); + suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index e0ace11..8e2298f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -716,6 +716,11 @@ public final class IgniteSystemProperties { public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL"; /** + * + */ + public static final String IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD = "IGNITE_WAL_ARCHIVE_COMPACT_SKIP_DELTA_RECORD"; + + /** * Tasks stealing will be started if tasks queue size per data-streamer thread exceeds this threshold. * <p> * Default value is {@code 4}. http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 8202ef8..2c90398 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -145,6 +145,9 @@ public class DataStorageConfiguration implements Serializable { /** Default write throttling enabled. */ public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false; + /** Default wal compaction enabled. */ + public static final boolean DFLT_WAL_COMPACTION_ENABLED = false; + /** Size of a memory chunk reserved for system cache initially. */ private long sysRegionInitSize = DFLT_SYS_CACHE_INIT_SIZE; @@ -243,6 +246,12 @@ public class DataStorageConfiguration implements Serializable { private boolean writeThrottlingEnabled = DFLT_WRITE_THROTTLING_ENABLED; /** + * Flag to enable WAL compaction. If true, system filters and compresses WAL archive in background. + * Compressed WAL archive gets automatically decompressed on demand. + */ + private boolean walCompactionEnabled = DFLT_WAL_COMPACTION_ENABLED; + + /** * Initial size of a data region reserved for system cache. * * @return Size in bytes. @@ -850,4 +859,22 @@ public class DataStorageConfiguration implements Serializable { return this; } + + /** + * @return Flag indicating whether WAL compaction is enabled. + */ + public boolean isWalCompactionEnabled() { + return walCompactionEnabled; + } + + /** + * Sets flag indicating whether WAL compaction is enabled. + * + * @param walCompactionEnabled Wal compaction enabled flag. + */ + public DataStorageConfiguration setWalCompactionEnabled(boolean walCompactionEnabled) { + this.walCompactionEnabled = walCompactionEnabled; + + return this; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index ce28ff2..42d9611 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -102,6 +102,14 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni public int truncate(WALPointer ptr); /** + * Gives a hint to WAL manager to compact WAL until given pointer (exclusively). + * Compaction implies filtering out physical records and ZIP compression. + * + * @param ptr Pointer for which it is safe to compact the log. + */ + public void allowCompressionUntil(WALPointer ptr); + + /** * @return Total number of segments in the WAL archive. */ public int walArchiveSegments(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java new file mode 100644 index 0000000..519e825 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java @@ -0,0 +1,31 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.pagemem.wal.record; + +import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator; + +/** + * Special type of WAL record. Shouldn't be stored in file. + * Returned by deserializer if next record is not matched by filter. Automatically handled by + * {@link AbstractWalRecordsIterator}. + */ +public class FilteredRecord extends WALRecord { + /** {@inheritDoc} */ + @Override public RecordType type() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java new file mode 100644 index 0000000..448a32c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MarshalledRecord.java @@ -0,0 +1,61 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.pagemem.wal.record; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; + +/** + * Special type of WAL record. Shouldn't be stored in file. + * Contains complete binary representation of record in {@link #buf} and record position in {@link #pos}. + */ +public class MarshalledRecord extends WALRecord { + /** Type of marshalled record. */ + private WALRecord.RecordType type; + + /** + * Heap buffer with marshalled record bytes. + * Due to performance reasons accessible only by thread that performs WAL iteration and until next record is read. + */ + private ByteBuffer buf; + + /** + * @param type Type of marshalled record. + * @param pos WAL pointer to record. + * @param buf Reusable buffer with record data. + */ + public MarshalledRecord(RecordType type, WALPointer pos, ByteBuffer buf) { + this.type = type; + this.buf = buf; + + position(pos); + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return type; + } + + /** + * @return Buffer with marshalled record bytes. Due to performance reasons accessible only by thread that performs + * WAL iteration and until next record is read. + */ + public ByteBuffer buffer() { + return buf; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index e07aef7..c0e59bc 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1447,6 +1447,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan cctx.pageStore().beginRecover(); } + else + cctx.wal().allowCompressionUntil(status.startPtr); long start = U.currentTimeMillis(); int applied = 0; @@ -2973,6 +2975,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } chp.walFilesDeleted = deleted; + + if (!chp.cpPages.isEmpty()) + cctx.wal().allowCompressionUntil(chp.cpEntry.checkpointMark()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index 5be6e55..7415db3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -30,7 +30,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -70,8 +72,8 @@ public abstract class AbstractWalRecordsIterator */ @NotNull protected final GridCacheSharedContext sharedCtx; - /** Serializer of current version to read headers. */ - @NotNull private final RecordSerializer serializer; + /** Serializer factory. */ + @NotNull private final RecordSerializerFactory serializerFactory; /** Factory to provide I/O interfaces for read/write operations with files */ @NotNull protected final FileIOFactory ioFactory; @@ -82,20 +84,20 @@ public abstract class AbstractWalRecordsIterator /** * @param log Logger. * @param sharedCtx Shared context. - * @param serializer Serializer of current version to read headers. + * @param serializerFactory Serializer of current version to read headers. * @param ioFactory ioFactory for file IO access. * @param bufSize buffer for reading records size. */ protected AbstractWalRecordsIterator( @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, - @NotNull final RecordSerializer serializer, + @NotNull final RecordSerializerFactory serializerFactory, @NotNull final FileIOFactory ioFactory, final int bufSize ) { this.log = log; this.sharedCtx = sharedCtx; - this.serializer = serializer; + this.serializerFactory = serializerFactory; this.ioFactory = ioFactory; buf = new ByteBufferExpander(bufSize, ByteOrder.nativeOrder()); @@ -107,7 +109,7 @@ public abstract class AbstractWalRecordsIterator * @return found WAL file descriptors */ protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException { - final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); + final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER); if (files == null) { throw new IgniteCheckedException("WAL files directory does not not denote a " + @@ -156,8 +158,12 @@ public abstract class AbstractWalRecordsIterator try { curRec = advanceRecord(currWalSegment); - if (curRec != null) + if (curRec != null) { + if (curRec.get2().type() == null) + continue; // Record was skipped by filter of current serializer, should read next record. + return; + } else { currWalSegment = advanceSegment(currWalSegment); @@ -275,20 +281,31 @@ public abstract class AbstractWalRecordsIterator FileIO fileIO = ioFactory.create(desc.file); try { - int serVer = FileWriteAheadLogManager.readSerializerVersion(fileIO); + IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO); - RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, serVer); + int serVer = tup.get1(); + + boolean isCompacted = tup.get2(); FileInput in = new FileInput(fileIO, buf); if (start != null && desc.idx == start.index()) { - // Make sure we skip header with serializer version. - long startOffset = Math.max(start.fileOffset(), fileIO.position()); - - in.seek(startOffset); + if (isCompacted) { + serializerFactory.skipPositionCheck(true); + + if (start.fileOffset() != 0) + serializerFactory.recordDeserializeFilter(new StartSeekingFilter(start)); + } + else { + // Make sure we skip header with serializer version. + long startOff = Math.max(start.fileOffset(), fileIO.position()); + + in.seek(startOff); + } } - return new FileWriteAheadLogManager.ReadFileHandle(fileIO, desc.idx, sharedCtx.igniteInstanceName(), ser, in); + return new FileWriteAheadLogManager.ReadFileHandle( + fileIO, desc.idx, sharedCtx.igniteInstanceName(), serializerFactory.createSerializer(serVer), in); } catch (SegmentEofException | EOFException ignore) { try { @@ -320,4 +337,32 @@ public abstract class AbstractWalRecordsIterator } } + /** + * Filter that drops all records until given start pointer is reached. + */ + private static class StartSeekingFilter implements P2<WALRecord.RecordType, WALPointer> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Start pointer. */ + private final FileWALPointer start; + + /** Start reached flag. */ + private boolean startReached; + + /** + * @param start Start. + */ + StartSeekingFilter(FileWALPointer start) { + this.start = start; + } + + /** {@inheritDoc} */ + @Override public boolean apply(WALRecord.RecordType type, WALPointer pointer) { + if (start.fileOffset() == ((FileWALPointer)pointer).fileOffset()) + startReached = true; + + return startReached; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java index 3b20fce..303a023 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java @@ -348,7 +348,13 @@ public final class FileInput implements ByteBufferBackedDataInput { /** {@inheritDoc} */ @Override public int skipBytes(int n) throws IOException { - throw new UnsupportedOperationException(); + ensure(n); + + int skipped = Math.min(buf.remaining(), n); + + buf.position(buf.position() + skipped); + + return skipped; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 3d8d78f..a450521 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -17,10 +17,14 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.File; import java.io.FileFilter; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -31,6 +35,8 @@ import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +46,9 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -49,6 +58,7 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.EventType; import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; @@ -57,24 +67,28 @@ import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; -import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV1Serializer; -import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordDataV2Serializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; -import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV2Serializer; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -123,8 +137,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } }; + /** */ + private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip"); + + /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ + public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() { + @Override public boolean accept(File file) { + return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() || + WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches()); + } + }; + + /** */ + private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp"); + + /** */ + private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() { + @Override public boolean accept(File file) { + return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); + } + }; + + /** */ + private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() { + @Override public boolean accept(File file) { + return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); + } + }; + /** Latest serializer version to use. */ - public static final int LATEST_SERIALIZER_VERSION = 1; + private static final int LATEST_SERIALIZER_VERSION = 2; /** */ private final boolean alwaysWriteFullPages; @@ -169,8 +211,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final int serializerVersion = IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION); - /** */ - private volatile long oldestArchiveSegmentIdx; + /** Latest segment cleared by {@link #truncate(WALPointer)}. */ + private volatile long lastTruncatedArchiveIdx = -1L; /** Factory to provide I/O interfaces for read/write operations with files */ private final FileIOFactory ioFactory; @@ -197,6 +239,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private volatile FileArchiver archiver; + /** Compressor. */ + private volatile FileCompressor compressor; + + /** Decompressor. */ + private volatile FileDecompressor decompressor; + /** */ private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); @@ -276,7 +324,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl "write ahead log archive directory" ); - serializer = forVersion(cctx, serializerVersion); + serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion); GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); @@ -286,10 +334,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); - oldestArchiveSegmentIdx = tup == null ? 0 : tup.get1(); + lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1; archiver = new FileArchiver(tup == null ? -1 : tup.get2()); + if (dsCfg.isWalCompactionEnabled()) { + compressor = new FileCompressor(); + + decompressor = new FileDecompressor(); + } + if (mode != WALMode.NONE) { if (log.isInfoEnabled()) log.info("Started write-ahead log manager [mode=" + mode + ']'); @@ -338,6 +392,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (archiver != null) archiver.shutdown(); + + if (compressor != null) + compressor.shutdown(); + + if (decompressor != null) + decompressor.shutdown(); } catch (Exception e) { U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e); @@ -354,8 +414,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (!cctx.kernalContext().clientNode()) { assert archiver != null; - archiver.start(); + + if (compressor != null) + compressor.start(); + + if (decompressor != null) + decompressor.start(); } } @@ -575,11 +640,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl (FileWALPointer)start, end, dsCfg, - serializer, + new RecordSerializerFactoryImpl(cctx), ioFactory, archiver, - log, - tlbSize + decompressor, + log ); } @@ -626,9 +691,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return {@code true} if has this index. */ private boolean hasIndex(long absIdx) { - String name = FileDescriptor.fileName(absIdx); + String segmentName = FileDescriptor.fileName(absIdx); + + String zipSegmentName = FileDescriptor.fileName(absIdx) + ".zip"; - boolean inArchive = new File(walArchiveDir, name).exists(); + boolean inArchive = new File(walArchiveDir, segmentName).exists() || + new File(walArchiveDir, zipSegmentName).exists(); if (inArchive) return true; @@ -651,7 +719,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // File pointer bound: older entries will be deleted from archive FileWALPointer fPtr = (FileWALPointer)ptr; - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); int deleted = 0; @@ -671,8 +739,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl deleted++; // Bump up the oldest archive segment index. - if (oldestArchiveSegmentIdx < desc.idx) - oldestArchiveSegmentIdx = desc.idx; + if (lastTruncatedArchiveIdx < desc.idx) + lastTruncatedArchiveIdx = desc.idx; } } @@ -680,15 +748,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ + @Override public void allowCompressionUntil(WALPointer ptr) { + if (compressor != null) + compressor.allowCompressionUntil(((FileWALPointer)ptr).index()); + } + + /** {@inheritDoc} */ @Override public int walArchiveSegments() { - long oldest = oldestArchiveSegmentIdx; + long lastTruncated = lastTruncatedArchiveIdx; long lastArchived = archiver.lastArchivedAbsoluteIndex(); if (lastArchived == -1) return 0; - int res = (int)(lastArchived - oldest); + int res = (int)(lastArchived - lastTruncated); return res >= 0 ? res : 0; } @@ -710,7 +784,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private long lastArchivedIndex() { long lastIdx = -1; - for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) { + for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { try { long idx = Long.parseLong(file.getName().substring(0, 16)); @@ -725,27 +799,42 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Lists files in archive directory and returns the index of last archived file. + * Lists files in archive directory and returns the indices of least and last archived files. + * In case of holes, first segment after last "hole" is considered as minimum. + * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20 * - * @return The absolute index of last archived file. + * @return The absolute indices of min and max archived files. */ private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() { - long minIdx = Integer.MAX_VALUE; - long maxIdx = -1; + TreeSet<Long> archiveIndices = new TreeSet<>(); - for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) { + for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { try { long idx = Long.parseLong(file.getName().substring(0, 16)); - minIdx = Math.min(minIdx, idx); - maxIdx = Math.max(maxIdx, idx); + archiveIndices.add(idx); } catch (NumberFormatException | IndexOutOfBoundsException ignore) { - + // No-op. } } - return maxIdx == -1 ? null : F.t(minIdx, maxIdx); + if (archiveIndices.isEmpty()) + return null; + else { + Long min = archiveIndices.first(); + Long max = archiveIndices.last(); + + if (max - min == archiveIndices.size() - 1) + return F.t(min, max); // Short path. + + for (Long idx : archiveIndices.descendingSet()) { + if (!archiveIndices.contains(idx - 1)) + return F.t(idx, max); + } + + throw new IllegalStateException("Should never happen if TreeSet is valid."); + } } /** @@ -836,14 +925,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // If we have existing segment, try to read version from it. if (lastReadPtr != null) { try { - serVer = readSerializerVersion(fileIO); + serVer = readSerializerVersionAndCompactedFlag(fileIO).get1(); } catch (SegmentEofException | EOFException ignore) { serVer = serializerVersion; } } - RecordSerializer ser = forVersion(cctx, serVer); + RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer); if (log.isInfoEnabled()) log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + @@ -1021,37 +1110,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); } - /** - * @param cctx Shared context. - * @param ver Serializer version. - * @return Entry serializer. - */ - public static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { - return forVersion(cctx, ver, false); - } - - /** - * @param ver Serializer version. - * @return Entry serializer. - */ - static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver, boolean writePointer) throws IgniteCheckedException { - if (ver <= 0) - throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file)."); - - switch (ver) { - case 1: - return new RecordV1Serializer(new RecordDataV1Serializer(cctx), writePointer); - - case 2: - RecordDataV2Serializer dataV2Serializer = new RecordDataV2Serializer(new RecordDataV1Serializer(cctx)); - - return new RecordV2Serializer(dataV2Serializer, writePointer); - - default: - throw new IgniteCheckedException("Failed to create a serializer with the given version " + - "(forward compatibility is not supported): " + ver); - } - } /** * @return Sorted WAL files descriptors. @@ -1103,13 +1161,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private IgniteCheckedException cleanException; /** - * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>. + * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. * Incremented during rollover. Also may be directly set if WAL is resuming logging after start. */ private long curAbsWalIdx = -1; /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ - private long lastAbsArchivedIdx = -1; + private volatile long lastAbsArchivedIdx = -1; /** current thread stopping advice */ private volatile boolean stopped; @@ -1135,7 +1193,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @return Last archived segment absolute index. */ - private synchronized long lastArchivedAbsoluteIndex() { + private long lastArchivedAbsoluteIndex() { return lastAbsArchivedIdx; } @@ -1221,7 +1279,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl wait(); if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1) - lastAbsArchivedIdx = curAbsWalIdx - 1; + changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1); } while (!Thread.currentThread().isInterrupted() && !stopped) { @@ -1252,7 +1310,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl formatFile(res.getOrigWorkFile()); // Then increase counter to allow rollover on clean working file - lastAbsArchivedIdx = toArchive; + changeLastArchivedIndexAndWakeupCompressor(toArchive); notifyAll(); } @@ -1275,6 +1333,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * @param idx Index. + */ + private void changeLastArchivedIndexAndWakeupCompressor(long idx) { + lastAbsArchivedIdx = idx; + + if (compressor != null) + compressor.onNextSegmentArchived(); + } + + /** * Gets the absolute index of the next WAL segment available to write. * Blocks till there are available file to write * @@ -1301,7 +1369,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return curAbsWalIdx; } } - catch (InterruptedException e) { + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IgniteInterruptedCheckedException(e); @@ -1426,6 +1494,314 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Responsible for compressing WAL archive segments. + * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved. + */ + private class FileCompressor extends Thread { + /** Current thread stopping advice. */ + private volatile boolean stopped; + + /** Last successfully compressed segment. */ + private volatile long lastCompressedIdx = -1L; + + /** All segments prior to this (inclusive) can be compressed. */ + private volatile long lastAllowedToCompressIdx = -1L; + + /** + * + */ + FileCompressor() { + super("wal-file-compressor%" + cctx.igniteInstanceName()); + } + + /** + * + */ + private void init() { + File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); + + for (File f : toDel) { + if (stopped) + return; + + f.delete(); + } + + FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); + + if (alreadyCompressed.length > 0) + lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx(); + } + + /** + * @param lastCpStartIdx Segment index to allow compression until (exclusively). + */ + synchronized void allowCompressionUntil(long lastCpStartIdx) { + lastAllowedToCompressIdx = lastCpStartIdx - 1; + + notify(); + } + + /** + * Callback for waking up compressor when new segment is archived. + */ + synchronized void onNextSegmentArchived() { + notify(); + } + + /** + * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. + * Waits if there's no segment to archive right now. + */ + private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException { + long segmentToCompress = lastCompressedIdx + 1; + + synchronized (this) { + while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) { + wait(); + + if (stopped) + return -1; + } + } + + segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1); + + boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); + + return reserved ? segmentToCompress : -1; + } + + /** + * + */ + private void deleteObsoleteRawSegments() { + FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + + FileArchiver archiver0 = archiver; + + for (FileDescriptor desc : descs) { + // Do not delete reserved or locked segment and any segment after it. + if (archiver0 != null && archiver0.reserved(desc.idx)) + return; + + if (desc.idx < lastCompressedIdx) { + if (!desc.file.delete()) + U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + + desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); + } + } + } + + /** {@inheritDoc} */ + @Override public void run() { + init(); + + while (!Thread.currentThread().isInterrupted() && !stopped) { + try { + deleteObsoleteRawSegments(); + + long nextSegment = tryReserveNextSegmentOrWait(); + if (nextSegment == -1) + continue; + + File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp"); + + File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip"); + + File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment)); + if (!Files.exists(raw.toPath())) + throw new IgniteCheckedException("WAL archive segment is missing: " + raw); + + compressSegmentToFile(nextSegment, raw, tmpZip); + + Files.move(tmpZip.toPath(), zip.toPath()); + + if (mode == WALMode.DEFAULT) { + try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { + f0.force(); + } + } + + lastCompressedIdx = nextSegment; + } + catch (IgniteCheckedException | IOException e) { + U.error(log, "Unexpected error during WAL compression", e); + + FileWriteHandle handle = currentHandle(); + + if (handle != null) + handle.invalidateEnvironment(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * @param nextSegment Next segment absolute idx. + * @param raw Raw file. + * @param zip Zip file. + */ + private void compressSegmentToFile(long nextSegment, File raw, File zip) + throws IOException, IgniteCheckedException { + int segmentSerializerVer; + + try (FileIO fileIO = ioFactory.create(raw)) { + IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO); + + segmentSerializerVer = tup.get1(); + } + + try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { + zos.putNextEntry(new ZipEntry("")); + + zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array()); + + final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() { + @Override public void applyx(WALRecord record) throws IgniteCheckedException { + final MarshalledRecord marshRec = (MarshalledRecord)record; + + try { + zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + }; + + try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( + log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) { + + while (iter.hasNextX()) + iter.nextX(); + } + } + finally { + release(new FileWALPointer(nextSegment, 0, 0)); + } + } + + /** + * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. + */ + private void shutdown() throws IgniteInterruptedCheckedException { + synchronized (this) { + stopped = true; + + notifyAll(); + } + + U.join(this); + } + } + + /** + * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay. + */ + private class FileDecompressor extends Thread { + /** Current thread stopping advice. */ + private volatile boolean stopped; + + /** Decompression futures. */ + private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>(); + + /** Segments queue. */ + private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>(); + + /** Byte array for draining data. */ + private byte[] arr = new byte[tlbSize]; + + /** + * + */ + FileDecompressor() { + super("wal-file-decompressor%" + cctx.igniteInstanceName()); + } + + /** {@inheritDoc} */ + @Override public void run() { + while (!Thread.currentThread().isInterrupted() && !stopped) { + try { + long segmentToDecompress = segmentsQueue.take(); + + if (stopped) + break; + + File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); + File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); + File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); + + try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); + FileIO io = ioFactory.create(unzipTmp)) { + zis.getNextEntry(); + + int bytesRead; + while ((bytesRead = zis.read(arr)) > 0) + io.write(arr, 0, bytesRead); + } + + Files.move(unzipTmp.toPath(), unzip.toPath()); + + synchronized (this) { + decompressionFutures.remove(segmentToDecompress).onDone(); + } + } + catch (InterruptedException e){ + Thread.currentThread().interrupt(); + } + catch (IOException e) { + U.error(log, "Unexpected error during WAL decompression", e); + + FileWriteHandle handle = currentHandle(); + + if (handle != null) + handle.invalidateEnvironment(e); + } + } + } + + /** + * Asynchronously decompresses WAL segment which is present only in .zip file. + * + * @return Future which is completed once file is decompressed. + */ + synchronized IgniteInternalFuture<Void> decompressFile(long idx) { + if (decompressionFutures.containsKey(idx)) + return decompressionFutures.get(idx); + + File f = new File(walArchiveDir, FileDescriptor.fileName(idx)); + + if (f.exists()) + return new GridFinishedFuture<>(); + + segmentsQueue.put(idx); + + GridFutureAdapter<Void> res = new GridFutureAdapter<>(); + + decompressionFutures.put(idx, res); + + return res; + } + + /** + * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. + */ + private void shutdown() throws IgniteInterruptedCheckedException { + synchronized (this) { + stopped = true; + + // Put fake -1 to wake thread from queue.take() + segmentsQueue.put(-1L); + } + + U.join(this); + } + } + + /** * Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need. * Check end when exit condition return false or all files are passed. * @@ -1452,14 +1828,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Reads record serializer version from provided {@code io}. + * Reads record serializer version from provided {@code io} along with compacted flag. * NOTE: Method mutates position of {@code io}. * * @param io I/O interface for file. * @return Serializer version stored in the file. * @throws IgniteCheckedException If failed to read serializer version. */ - public static int readSerializerVersion(FileIO io) + public static IgniteBiTuple<Integer, Boolean> readSerializerVersionAndCompactedFlag(FileIO io) throws IgniteCheckedException, IOException { try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) { FileInput in = new FileInput(io, buf); @@ -1481,19 +1857,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr; - long headerMagicNumber = in.readLong(); + long hdrMagicNum = in.readLong(); - if (headerMagicNumber != HeaderRecord.MAGIC) - throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.MAGIC) + - ", actual=" + U.hexLong(headerMagicNumber) + ']'); + boolean compacted; + if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC) + compacted = false; + else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC) + compacted = true; + else { + throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) + + ", actual=" + U.hexLong(hdrMagicNum) + ']'); + } // Read serializer version. - int version = in.readInt(); + int ver = in.readInt(); // Read and skip CRC. in.readInt(); - return version; + return new IgniteBiTuple<>(ver, compacted); } } @@ -1508,47 +1890,58 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IOException If failed to write serializer version. */ public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); - buffer.order(ByteOrder.nativeOrder()); + ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false); + + do { + io.write(buffer); + } + while (buffer.hasRemaining()); + + // Flush + io.force(); + + return io.position(); + } + + /** + * @param idx Index. + * @param ver Version. + * @param compacted Compacted flag. + */ + @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) { + ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); + buf.order(ByteOrder.nativeOrder()); // Write record type. - buffer.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); + buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); // Write position. - RecordV1Serializer.putPosition(buffer, new FileWALPointer(idx, 0, 0)); + RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0)); // Place magic number. - buffer.putLong(HeaderRecord.MAGIC); + buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC); // Place serializer version. - buffer.putInt(version); + buf.putInt(ver); // Place CRC if needed. if (!RecordV1Serializer.SKIP_CRC) { - int curPos = buffer.position(); + int curPos = buf.position(); - buffer.position(0); + buf.position(0); // This call will move buffer position to the end of the record again. - int crcVal = PureJavaCrc32.calcCrc32(buffer, curPos); + int crcVal = PureJavaCrc32.calcCrc32(buf, curPos); - buffer.putInt(crcVal); + buf.putInt(crcVal); } else - buffer.putInt(0); + buf.putInt(0); // Write header record through io. - buffer.position(0); - - do { - io.write(buffer); - } - while (buffer.hasRemaining()); + buf.position(0); - // Flush - io.force(); - - return io.position(); + return buf; } /** @@ -1579,11 +1972,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl String fileName = file.getName(); - assert fileName.endsWith(WAL_SEGMENT_FILE_EXT); - - int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length(); + assert fileName.contains(WAL_SEGMENT_FILE_EXT); - this.idx = idx == null ? Long.parseLong(fileName.substring(0, end)) : idx; + this.idx = idx == null ? Long.parseLong(fileName.substring(0, 16)) : idx; } /** @@ -2193,17 +2584,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert stopped() : "Segment is not closed after close flush: " + head.get(); try { - int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE + RecordV1Serializer.CRC_SIZE; + RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) + .createSerializer(serializerVersion); - if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { - RecordV1Serializer backwardSerializer = - new RecordV1Serializer(new RecordDataV1Serializer(cctx), true); + SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); - final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); + int switchSegmentRecSize = backwardSerializer.size(segmentRecord); - SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); - segmentRecord.position( new FileWALPointer(idx, (int)written, -1)); - backwardSerializer.writeRecord(segmentRecord,buf); + if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { + final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); + + segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize)); + backwardSerializer.writeRecord(segmentRecord, buf); buf.rewind(); @@ -2487,6 +2879,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final FileArchiver archiver; /** */ + private final FileDecompressor decompressor; + + /** */ private final DataStorageConfiguration psCfg; /** Optional start pointer. */ @@ -2504,10 +2899,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param start Optional start pointer. * @param end Optional end pointer. * @param psCfg Database configuration. - * @param serializer Serializer of current version to read headers. + * @param serializerFactory Serializer factory. * @param archiver Archiver. - * @param log Logger - * @throws IgniteCheckedException If failed to initialize WAL segment. + * @param decompressor Decompressor. + *@param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. */ private RecordsIterator( GridCacheSharedContext cctx, @@ -2516,15 +2911,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Nullable FileWALPointer start, @Nullable FileWALPointer end, DataStorageConfiguration psCfg, - @NotNull RecordSerializer serializer, + @NotNull RecordSerializerFactory serializerFactory, FileIOFactory ioFactory, FileArchiver archiver, - IgniteLogger log, - int tlbSize + FileDecompressor decompressor, + IgniteLogger log ) throws IgniteCheckedException { super(log, cctx, - serializer, + serializerFactory, ioFactory, psCfg.getWalRecordIteratorBufferSize()); this.walWorkDir = walWorkDir; @@ -2533,6 +2928,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl this.archiver = archiver; this.start = start; this.end = end; + this.decompressor = decompressor; init(); @@ -2540,6 +2936,26 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ + @Override protected ReadFileHandle initReadHandle( + @NotNull FileDescriptor desc, + @Nullable FileWALPointer start + ) throws IgniteCheckedException, FileNotFoundException { + if (decompressor != null && !desc.file.exists()) { + FileDescriptor zipFile = new FileDescriptor( + new File(walArchiveDir, FileDescriptor.fileName(desc.getIdx()) + ".zip")); + + if (!zipFile.file.exists()) { + throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + + "[segmentIdx=" + desc.idx + "]"); + } + + decompressor.decompressFile(desc.idx).get(); + } + + return super.initReadHandle(desc, start); + } + + /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { super.onClose(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java deleted file mode 100644 index 5a14095..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordDataSerializer.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.wal; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.wal.record.WALRecord; - -/** - * Interface to provide size, read and write operations with WAL records - * <b>without any headers and meta information</b>. - */ -public interface RecordDataSerializer { - /** - * Calculates size of record data. - * - * @param record WAL record. - * @return Size of record in bytes. - * @throws IgniteCheckedException If it's unable to calculate record data size. - */ - int size(WALRecord record) throws IgniteCheckedException; - - /** - * Reads record data of {@code type} from buffer {@code in}. - * - * @param type Record type. - * @param in Buffer to read. - * @return WAL record. - * @throws IOException In case of I/O problems. - * @throws IgniteCheckedException If it's unable to read record. - */ - WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException; - - /** - * Writes record data to buffer {@code buf}. - * - * @param record WAL record. - * @param buf Buffer to write. - * @throws IgniteCheckedException If it's unable to write record. - */ - void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java deleted file mode 100644 index 12e16a8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.wal; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.pagemem.wal.WALPointer; -import org.apache.ignite.internal.pagemem.wal.record.WALRecord; - -/** - * Record serializer. - */ -public interface RecordSerializer { - /** - * @return serializer version - */ - public int version(); - - /** - * Calculates record size in byte including expected wal pointer, CRC and type field - * - * @param record Record. - * @return Size in bytes. - */ - public int size(WALRecord record) throws IgniteCheckedException; - - /** - * @param record Entry to write. - * @param buf Buffer. - */ - public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; - - /** - * Loads record from input - * - * @param in Data input to read data from. - * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file - * @return Read entry. - */ - public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; - - /** - * Flag to write (or not) wal pointer to record - */ - public boolean writePointer(); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f50b2354/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java new file mode 100644 index 0000000..4a846b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java @@ -0,0 +1,141 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.record.RecordTypes; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; +import org.apache.ignite.internal.util.typedef.CIX1; +import org.apache.ignite.internal.util.typedef.P2; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Iterates over logical records of one WAL segment from archive. Used for WAL archive compression. + * Doesn't deserialize actual record data, returns {@link MarshalledRecord} instances instead. + */ +public class SingleSegmentLogicalRecordsIterator extends AbstractWalRecordsIterator { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Segment initialized flag. */ + private boolean segmentInitialized; + + /** Archived segment index. */ + private long archivedSegIdx; + + /** Archive directory. */ + private File archiveDir; + + /** Closure which is executed right after advance. */ + private CIX1<WALRecord> advanceC; + + /** + * @param log Logger. + * @param sharedCtx Shared context. + * @param ioFactory Io factory. + * @param bufSize Buffer size. + * @param archivedSegIdx Archived seg index. + * @param archiveDir Directory with segment. + * @param advanceC Closure which is executed right after advance. + */ + SingleSegmentLogicalRecordsIterator( + @NotNull IgniteLogger log, + @NotNull GridCacheSharedContext sharedCtx, + @NotNull FileIOFactory ioFactory, + int bufSize, + long archivedSegIdx, + File archiveDir, + CIX1<WALRecord> advanceC + ) throws IgniteCheckedException { + super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), ioFactory, bufSize); + + this.archivedSegIdx = archivedSegIdx; + this.archiveDir = archiveDir; + this.advanceC = advanceC; + + advance(); + } + + /** + * @param sharedCtx Shared context. + */ + private static RecordSerializerFactory initLogicalRecordsSerializerFactory(GridCacheSharedContext sharedCtx) + throws IgniteCheckedException { + + return new RecordSerializerFactoryImpl(sharedCtx) + .recordDeserializeFilter(new LogicalRecordsFilter()) + .marshalledMode(true); + } + + /** {@inheritDoc} */ + @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment( + @Nullable FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException { + if (segmentInitialized) { + closeCurrentWalSegment(); + // No advance as we iterate over single segment. + return null; + } + else { + segmentInitialized = true; + + FileWriteAheadLogManager.FileDescriptor fd = new FileWriteAheadLogManager.FileDescriptor( + new File(archiveDir, FileWriteAheadLogManager.FileDescriptor.fileName(archivedSegIdx))); + + try { + return initReadHandle(fd, null); + } + catch (FileNotFoundException e) { + throw new IgniteCheckedException("Missing WAL segment in the archive", e); + } + } + } + + /** {@inheritDoc} */ + @Override protected void advance() throws IgniteCheckedException { + super.advance(); + + if (curRec != null && advanceC != null) + advanceC.apply(curRec.get2()); + } + + /** + * + */ + private static class LogicalRecordsFilter implements P2<WALRecord.RecordType, WALPointer> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Records type to skip. */ + private final Set<WALRecord.RecordType> skip = RecordTypes.DELTA_TYPE_SET; + + /** {@inheritDoc} */ + @Override public boolean apply(WALRecord.RecordType type, WALPointer ptr) { + return !skip.contains(type); + } + } +}
