This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-18540 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit b1a355a2b4df6082da4d93a11d9253eacd9af036 Author: Aleksey Plekhanov <[email protected]> AuthorDate: Mon Apr 29 13:21:49 2019 +0300 GG-17480 Implement WAL page snapshot records compression - Fixes #6116. Signed-off-by: Dmitriy Govorukhin <[email protected]> (cherry picked from commit 2ad6c85) --- modules/compress/pom.xml | 15 ++ .../compress/CompressionProcessorImpl.java | 100 +++++++--- ...ionWithRealCpDisabledAndWalCompressionTest.java | 37 ++++ .../wal/WalCompactionAndPageCompressionTest.java | 35 ++++ .../WalRecoveryWithPageCompressionAndTdeTest.java | 51 ++++++ .../db/wal/WalRecoveryWithPageCompressionTest.java | 36 ++++ .../AbstractPageCompressionIntegrationTest.java | 201 +++++++++++++++++++++ .../DiskPageCompressionIntegrationTest.java | 171 +----------------- .../WalPageCompressionIntegrationTest.java | 101 +++++++++++ .../testsuites/IgnitePdsCompressionTestSuite.java | 12 ++ .../ignite/configuration/CacheConfiguration.java | 5 +- .../configuration/DataStorageConfiguration.java | 53 ++++++ .../ignite/configuration/DiskPageCompression.java | 5 + .../internal/pagemem/wal/record/PageSnapshot.java | 76 ++++++-- .../processors/cache/CacheCompressionManager.java | 10 +- .../GridCacheDatabaseSharedManager.java | 9 + .../cache/persistence/file/FilePageStore.java | 4 + .../cache/persistence/pagemem/PageMemoryImpl.java | 11 +- .../cache/persistence/tree/io/PageIO.java | 38 +++- .../persistence/wal/FileWriteAheadLogManager.java | 43 +++++ .../wal/serializer/RecordDataSerializer.java | 3 +- .../wal/serializer/RecordDataV1Serializer.java | 17 +- .../wal/serializer/RecordDataV2Serializer.java | 17 +- .../wal/serializer/RecordV1Serializer.java | 2 +- .../wal/serializer/RecordV2Serializer.java | 3 +- .../processors/compress/CompressionProcessor.java | 11 +- .../platform/utils/PlatformConfigurationUtils.java | 15 ++ .../processors/cache/persistence/DummyPageIO.java | 4 + ...CheckpointSimulationWithRealCpDisabledTest.java | 25 ++- .../persistence/db/wal/WalCompactionTest.java | 26 ++- .../wal/memtracker/PageMemoryTracker.java | 3 +- .../persistence/db/wal/IgniteWalRecoveryTest.java | 71 ++++++-- .../Config/full-config.xml | 3 +- .../IgniteConfigurationSerializerTest.cs | 3 + .../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 1 + .../Configuration/DataStorageConfiguration.cs | 22 ++- .../Configuration/DiskPageCompression.cs | 50 +++++ .../IgniteConfigurationSection.xsd | 20 ++ 38 files changed, 1049 insertions(+), 260 deletions(-) diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml index a7f77de..bdcd924 100644 --- a/modules/compress/pom.xml +++ b/modules/compress/pom.xml @@ -42,6 +42,21 @@ </dependency> <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <type>test-jar</type> + <scope>test</scope> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-indexing</artifactId> + <scope>test</scope> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>com.github.jnr</groupId> <artifactId>jnr-posix</artifactId> <version>${jnr.posix.version}</version> diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java index a89f67a..6d1b95f 100644 --- a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java @@ -27,8 +27,10 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.xerial.snappy.Snappy; @@ -63,6 +65,11 @@ public class CompressionProcessorImpl extends CompressionProcessor { } /** {@inheritDoc} */ + @Override public void checkPageCompressionSupported() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException { if (!U.isLinux()) throw new IgniteCheckedException("Currently page compression is supported only for Linux."); @@ -92,46 +99,74 @@ public class CompressionProcessorImpl extends CompressionProcessor { DiskPageCompression compression, int compressLevel ) throws IgniteCheckedException { - assert compression != null; - assert U.isPow2(pageSize): pageSize; + assert compression != null && compression != DiskPageCompression.DISABLED : compression; assert U.isPow2(blockSize): blockSize; - assert page.position() == 0 && page.limit() == pageSize; + assert page.position() == 0 && page.limit() >= pageSize; - PageIO io = PageIO.getPageIO(page); + int oldPageLimit = page.limit(); - if (!(io instanceof CompactablePageIO)) - return page; + try { + // Page size will be less than page limit when TDE is enabled. To make compaction and compression work + // correctly we need to set limit to real page size. + page.limit(pageSize); - ByteBuffer compactPage = compactBuf.get(); + ByteBuffer compactPage = doCompactPage(page, pageSize); - // Drop the garbage from the page. - ((CompactablePageIO)io).compactPage(page, compactPage, pageSize); - page.clear(); + int compactSize = compactPage.limit(); - int compactSize = compactPage.limit(); + assert compactSize <= pageSize : compactSize; - assert compactSize <= pageSize: compactSize; + // If no need to compress further or configured just to skip garbage. + if (compactSize < blockSize || compression == SKIP_GARBAGE) + return setCompactionInfo(compactPage, compactSize); + + ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel); + + assert compressedPage.position() == 0; + int compressedSize = compressedPage.limit(); + + int freeCompactBlocks = (pageSize - compactSize) / blockSize; + int freeCompressedBlocks = (pageSize - compressedSize) / blockSize; + + if (freeCompactBlocks >= freeCompressedBlocks) { + if (freeCompactBlocks == 0) + return page; // No blocks will be released. + + return setCompactionInfo(compactPage, compactSize); + } - // If no need to compress further or configured just to skip garbage. - if (compactSize < blockSize || compression == SKIP_GARBAGE) - return setCompactionInfo(compactPage, compactSize); + return setCompressionInfo(compressedPage, compression, compressedSize, compactSize); + } + finally { + page.limit(oldPageLimit); + } + } - ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel); + /** + * @param page Page buffer. + * @param pageSize Page size. + * @return Compacted page buffer. + */ + private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { + PageIO io = PageIO.getPageIO(page); - assert compressedPage.position() == 0; - int compressedSize = compressedPage.limit(); + ByteBuffer compactPage = compactBuf.get(); - int freeCompactBlocks = (pageSize - compactSize) / blockSize; - int freeCompressedBlocks = (pageSize - compressedSize) / blockSize; + if (io instanceof CompactablePageIO) { + // Drop the garbage from the page. + ((CompactablePageIO)io).compactPage(page, compactPage, pageSize); + } + else { + // Direct buffer is required as output of this method. + if (page.isDirect()) + return page; - if (freeCompactBlocks >= freeCompressedBlocks) { - if (freeCompactBlocks == 0) - return page; // No blocks will be released. + PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, page.array()); - return setCompactionInfo(compactPage, compactSize); + compactPage.limit(pageSize); } - return setCompressionInfo(compressedPage, compression, compressedSize, compactSize); + return compactPage; } /** @@ -260,7 +295,7 @@ public class CompressionProcessorImpl extends CompressionProcessor { * @return Level. */ private static byte getCompressionType(DiskPageCompression compression) { - if (compression == null) + if (compression == DiskPageCompression.DISABLED) return UNCOMPRESSED_PAGE; switch (compression) { @@ -281,7 +316,7 @@ public class CompressionProcessorImpl extends CompressionProcessor { /** {@inheritDoc} */ @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { - assert page.capacity() == pageSize; + assert page.capacity() >= pageSize : "capacity=" + page.capacity() + ", pageSize=" + pageSize; byte compressType = PageIO.getCompressionType(page); @@ -338,11 +373,16 @@ public class CompressionProcessorImpl extends CompressionProcessor { assert page.limit() == compactSize; } - CompactablePageIO io = PageIO.getPageIO(page); + PageIO io = PageIO.getPageIO(page); - io.restorePage(page, pageSize); + if (io instanceof CompactablePageIO) + ((CompactablePageIO)io).restorePage(page, pageSize); + else { + assert compactSize == pageSize + : "Wrong compacted page size [compactSize=" + compactSize + ", pageSize=" + pageSize + ']'; + } - setCompressionInfo(page, null, 0, 0); + setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0); } /** */ diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java new file mode 100644 index 0000000..48416b4 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.java @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest; + +/** + * + */ +public class IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest + extends IgnitePdsCheckpointSimulationWithRealCpDisabledTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY); + + return cfg; + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java new file mode 100644 index 0000000..01e2040 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAndPageCompressionTest.java @@ -0,0 +1,35 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class WalCompactionAndPageCompressionTest extends WalCompactionTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getDataStorageConfiguration().setWalPageCompression(DiskPageCompression.SNAPPY); + + return cfg; + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java new file mode 100644 index 0000000..07c1312 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionAndTdeTest.java @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.encryption.AbstractEncryptionTest; +import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; + +/** + * + */ +public class WalRecoveryWithPageCompressionAndTdeTest extends WalRecoveryWithPageCompressionTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + for (CacheConfiguration<?, ?> ccfg : cfg.getCacheConfiguration()) + ccfg.setEncryptionEnabled(true); + + KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi(); + + encSpi.setKeyStorePath(AbstractEncryptionTest.KEYSTORE_PATH); + encSpi.setKeyStorePassword(AbstractEncryptionTest.KEYSTORE_PASSWORD.toCharArray()); + + cfg.setEncryptionSpi(encSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override public void testWalRenameDirSimple() throws Exception { + // Ignore this test when TDE is enabled, since there is internal cache group id change without corresponding + // encryption keys change. + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java new file mode 100644 index 0000000..aaf4b25 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRecoveryWithPageCompressionTest.java @@ -0,0 +1,36 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.testframework.junits.WithSystemProperty; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION; + +/** + * WAL recovery test with WAL page compression enabled and PDS disk page compression disabled. + */ +@WithSystemProperty(key = IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, value = "DISABLED") +public class WalRecoveryWithPageCompressionTest extends IgniteWalRecoveryTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + walPageCompression = DiskPageCompression.ZSTD; + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java new file mode 100644 index 0000000..31d838d --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/AbstractPageCompressionIntegrationTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.io.Serializable; +import java.util.UUID; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.configuration.DiskPageCompression.LZ4; +import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; +import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY; +import static org.apache.ignite.configuration.DiskPageCompression.ZSTD; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL; + +/** + * + */ +public abstract class AbstractPageCompressionIntegrationTest extends GridCommonAbstractTest { + /** */ + protected DiskPageCompression compression; + + /** */ + protected Integer compressionLevel; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + compression = DiskPageCompression.DISABLED; + compressionLevel = null; + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Zstd_Max() throws Exception { + compression = ZSTD; + compressionLevel = ZSTD_MAX_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Zstd_Default() throws Exception { + compression = ZSTD; + compressionLevel = null; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Zstd_Min() throws Exception { + compression = ZSTD; + compressionLevel = ZSTD_MIN_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Lz4_Max() throws Exception { + compression = LZ4; + compressionLevel = LZ4_MAX_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Lz4_Default() throws Exception { + compression = LZ4; + compressionLevel = null; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Lz4_Min() throws Exception { + assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_SkipGarbage() throws Exception { + compression = SKIP_GARBAGE; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPageCompression_Snappy() throws Exception { + compression = SNAPPY; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + protected abstract void doTestPageCompression() throws Exception; + + /** + * + */ + static class TestVal implements Serializable { + /** */ + static final long serialVersionUID = 1L; + + /** */ + @QuerySqlField + String str; + + /** */ + int i; + + /** */ + @QuerySqlField + long x; + + /** */ + @QuerySqlField + UUID id; + + TestVal(int i) { + this.str = i + "bla bla bla!"; + this.i = -i; + this.x = 0xffaabbccdd773311L + i; + this.id = new UUID(i,-i); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestVal testVal = (TestVal)o; + + if (i != testVal.i) return false; + if (x != testVal.x) return false; + if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false; + return id != null ? id.equals(testVal.id) : testVal.id == null; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = str != null ? str.hashCode() : 0; + result = 31 * result + i; + result = 31 * result + (int)(x ^ (x >>> 32)); + result = 31 * result + (id != null ? id.hashCode() : 0); + return result; + } + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java index 0158828..865ab22 100644 --- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java @@ -18,21 +18,17 @@ package org.apache.ignite.internal.processors.compress; import java.io.File; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.file.OpenOption; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.pagemem.store.PageStore; @@ -46,47 +42,19 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Test; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; -import static org.apache.ignite.configuration.DiskPageCompression.LZ4; -import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; -import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY; import static org.apache.ignite.configuration.DiskPageCompression.ZSTD; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL; /** * */ -public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest { - /** */ - private DiskPageCompression compression; - - /** */ - private Integer compressionLevel; - +public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest { /** */ private FileIOFactory factory; /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - compression = null; - compressionLevel = null; - cleanPersistenceDir(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception { DataRegionConfiguration drCfg = new DataRegionConfiguration() .setPersistenceEnabled(true); @@ -112,90 +80,8 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - @Test - public void testPageCompression_Zstd_Max() throws Exception { - compression = ZSTD; - compressionLevel = ZSTD_MAX_LEVEL; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Zstd_Default() throws Exception { - compression = ZSTD; - compressionLevel = null; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Zstd_Min() throws Exception { - compression = ZSTD; - compressionLevel = ZSTD_MIN_LEVEL; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Lz4_Max() throws Exception { - compression = LZ4; - compressionLevel = LZ4_MAX_LEVEL; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Lz4_Default() throws Exception { - compression = LZ4; - compressionLevel = null; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Lz4_Min() throws Exception { - assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_SkipGarbage() throws Exception { - compression = SKIP_GARBAGE; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPageCompression_Snappy() throws Exception { - compression = SNAPPY; - - doTestPageCompression(); - } - - /** - * @throws Exception If failed. - */ - private void doTestPageCompression() throws Exception { + @Override + protected void doTestPageCompression() throws Exception { IgniteEx ignite = startGrid(0); ignite.cluster().active(true); @@ -342,57 +228,6 @@ public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest { /** */ - static class TestVal implements Serializable { - /** */ - static final long serialVersionUID = 1L; - - /** */ - @QuerySqlField - String str; - - /** */ - int i; - - /** */ - @QuerySqlField - long x; - - /** */ - @QuerySqlField - UUID id; - - TestVal(int i) { - this.str = i + "bla bla bla!"; - this.i = -i; - this.x = 0xffaabbccdd773311L + i; - this.id = new UUID(i,-i); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TestVal testVal = (TestVal)o; - - if (i != testVal.i) return false; - if (x != testVal.x) return false; - if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false; - return id != null ? id.equals(testVal.id) : testVal.id == null; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = str != null ? str.hashCode() : 0; - result = 31 * result + i; - result = 31 * result + (int)(x ^ (x >>> 32)); - result = 31 * result + (id != null ? id.hashCode() : 0); - return result; - } - } - - /** - */ static class PunchFileIO extends FileIODecorator { /** */ private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>(); diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java new file mode 100644 index 0000000..c16f9e4 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/WalPageCompressionIntegrationTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; + +/** + * + */ +public class WalPageCompressionIntegrationTest extends AbstractPageCompressionIntegrationTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception { + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)) + .setWalPageCompression(compression) + .setWalPageCompressionLevel(compressionLevel); + + return super.getConfiguration(igniteName) + .setDataStorageConfiguration(dsCfg) + // Set new IP finder for each node to start independent clusters. + .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder(true))); + } + + /** + * @throws Exception If failed. + */ + @Override + protected void doTestPageCompression() throws Exception { + // Ignite instance with compressed WAL page records. + IgniteEx ignite0 = startGrid(0); + + compression = DiskPageCompression.DISABLED; + compressionLevel = null; + + // Reference ignite instance with uncompressed WAL page records. + IgniteEx ignite1 = startGrid(1); + + ignite0.cluster().active(true); + ignite1.cluster().active(true); + + String cacheName = "test"; + + CacheConfiguration<Integer, TestVal> ccfg = new CacheConfiguration<Integer, TestVal>() + .setName(cacheName) + .setBackups(0) + .setAtomicityMode(ATOMIC) + .setIndexedTypes(Integer.class, TestVal.class); + + IgniteCache<Integer,TestVal> cache0 = ignite0.getOrCreateCache(ccfg); + IgniteCache<Integer,TestVal> cache1 = ignite1.getOrCreateCache(ccfg); + + int cnt = 20_000; + + for (int i = 0; i < cnt; i++) { + assertTrue(cache0.putIfAbsent(i, new TestVal(i))); + assertTrue(cache1.putIfAbsent(i, new TestVal(i))); + } + + for (int i = 0; i < cnt; i += 2) { + assertEquals(new TestVal(i), cache0.getAndRemove(i)); + assertEquals(new TestVal(i), cache1.getAndRemove(i)); + } + + // Write any WAL record to get current WAL pointers. + FileWALPointer ptr0 = (FileWALPointer)ignite0.context().cache().context().wal().log(new CheckpointRecord(null)); + FileWALPointer ptr1 = (FileWALPointer)ignite1.context().cache().context().wal().log(new CheckpointRecord(null)); + + log.info("Compressed WAL pointer: " + ptr0); + log.info("Uncompressed WAL pointer: " + ptr1); + + assertTrue("Compressed WAL must be smaller than uncompressed [ptr0=" + ptr0 + ", ptr1=" + ptr1 + ']', + ptr0.compareTo(ptr1) < 0); + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java index b38abc0..efbdd14 100644 --- a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java +++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java @@ -18,11 +18,16 @@ package org.apache.ignite.testsuites; import java.util.ArrayList; import java.util.List; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionTest; import org.apache.ignite.internal.processors.compress.CompressionConfigurationTest; import org.apache.ignite.internal.processors.compress.CompressionProcessorTest; import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationAsyncTest; import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest; import org.apache.ignite.internal.processors.compress.FileSystemUtilsTest; +import org.apache.ignite.internal.processors.compress.WalPageCompressionIntegrationTest; import org.apache.ignite.testframework.junits.DynamicSuite; import org.junit.runner.RunWith; @@ -45,6 +50,13 @@ public class IgnitePdsCompressionTestSuite { suite.add(DiskPageCompressionIntegrationTest.class); suite.add(DiskPageCompressionIntegrationAsyncTest.class); + // WAL page records compression. + suite.add(WalPageCompressionIntegrationTest.class); + suite.add(WalRecoveryWithPageCompressionTest.class); + suite.add(WalRecoveryWithPageCompressionAndTdeTest.class); + suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class); + suite.add(WalCompactionAndPageCompressionTest.class); + enableCompressionByDefault(); IgnitePdsTestSuite.addRealPageStoreTests(suite, null); diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 800b41c..1df064e 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -190,6 +190,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default SQL on-heap cache size. */ public static final int DFLT_SQL_ONHEAP_CACHE_MAX_SIZE = 0; + /** Default disk page compression algorithm. */ + public static final DiskPageCompression DFLT_DISK_PAGE_COMPRESSION = DiskPageCompression.DISABLED; + /** Cache name. */ private String name; @@ -2318,7 +2321,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { * @see #getDiskPageCompressionLevel */ public DiskPageCompression getDiskPageCompression() { - return diskPageCompression; + return diskPageCompression == null ? DFLT_DISK_PAGE_COMPRESSION : diskPageCompression; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index c2cb8bc..d10bda1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -166,6 +166,9 @@ public class DataStorageConfiguration implements Serializable { /** Default wal compaction level. */ public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED; + /** Default compression algorithm for WAL page snapshot records. */ + public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED; + /** Initial size of a memory chunk reserved for system cache. */ private long sysRegionInitSize = DFLT_SYS_REG_INIT_SIZE; @@ -289,6 +292,12 @@ public class DataStorageConfiguration implements Serializable { /** Timeout for checkpoint read lock acquisition. */ private Long checkpointReadLockTimeout; + /** Compression algorithm for WAL page snapshot records. */ + private DiskPageCompression walPageCompression = DFLT_WAL_PAGE_COMPRESSION; + + /** Compression level for WAL page snapshot records. */ + private Integer walPageCompressionLevel; + /** * Initial size of a data region reserved for system cache. * @@ -1020,6 +1029,50 @@ public class DataStorageConfiguration implements Serializable { return this; } + /** + * Gets compression algorithm for WAL page snapshot records. + * + * @return Page compression algorithm. + */ + public DiskPageCompression getWalPageCompression() { + return walPageCompression == null ? DFLT_WAL_PAGE_COMPRESSION : walPageCompression; + } + + /** + * Sets compression algorithm for WAL page snapshot records. + * + * @param walPageCompression Page compression algorithm. + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setWalPageCompression(DiskPageCompression walPageCompression) { + this.walPageCompression = walPageCompression; + + return this; + } + + /** + * Gets {@link #getWalPageCompression algorithm} specific WAL page compression level. + * + * @return WAL page snapshots compression level or {@code null} for default. + */ + public Integer getWalPageCompressionLevel() { + return walPageCompressionLevel; + } + + /** + * Sets {@link #setWalPageCompression algorithm} specific page compression level. + * + * @param walPageCompressionLevel Disk page compression level or {@code null} to use default. + * {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}). + * {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}). + * @return {@code this} for chaining. + */ + public DataStorageConfiguration setWalPageCompressionLevel(Integer walPageCompressionLevel) { + this.walPageCompressionLevel = walPageCompressionLevel; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStorageConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java index 5deda2f..29186f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java @@ -23,8 +23,13 @@ import org.jetbrains.annotations.Nullable; * * @see CacheConfiguration#setDiskPageCompression * @see CacheConfiguration#setDiskPageCompressionLevel + * @see DataStorageConfiguration#setWalPageCompression + * @see DataStorageConfiguration#setWalPageCompressionLevel */ public enum DiskPageCompression { + /** Compression disabled. */ + DISABLED, + /** Retain only useful data from half-filled pages, but do not apply any compression. */ SKIP_GARBAGE, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java index 6ae2c92..feae9d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java @@ -28,16 +28,16 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; /** * */ -public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ +public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware { /** */ @GridToStringExclude - private byte[] pageData; + private ByteBuffer pageData; /** */ private FullPageId fullPageId; /** - * PageSIze without encryption overhead. + * PageSize without encryption overhead. */ private int realPageSize; @@ -48,13 +48,16 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ */ public PageSnapshot(FullPageId fullId, byte[] arr, int realPageSize) { this.fullPageId = fullId; - this.pageData = arr; + this.pageData = ByteBuffer.wrap(arr).order(ByteOrder.nativeOrder()); this.realPageSize = realPageSize; } /** + * This constructor doesn't actually create a page snapshot (copy), it creates a wrapper over page memory region. + * A created record should not be used after WAL manager writes it to log, since page content can be modified. + * * @param fullPageId Full page ID. - * @param ptr Pointer to copy from. + * @param ptr Pointer to wrap. * @param pageSize Page size. * @param realPageSize Page size without encryption overhead. */ @@ -62,9 +65,7 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ this.fullPageId = fullPageId; this.realPageSize = realPageSize; - pageData = new byte[pageSize]; - - GridUnsafe.copyMemory(null, ptr, pageData, GridUnsafe.BYTE_ARR_OFF, pageSize); + pageData = GridUnsafe.wrapPointer(ptr, pageSize); } /** {@inheritDoc} */ @@ -76,6 +77,31 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ * @return Snapshot of page data. */ public byte[] pageData() { + if (!pageData.isDirect()) + return pageData.array(); + + // In case of direct buffer copy buffer content to new array. + byte[] arr = new byte[pageData.limit()]; + + GridUnsafe.copyMemory(null, GridUnsafe.bufferAddress(pageData), arr, GridUnsafe.BYTE_ARR_OFF, + pageData.limit()); + + return arr; + } + + /** + * @return Size of page data. + */ + public int pageDataSize() { + return pageData.limit(); + } + + /** + * @return Page data byte buffer. + */ + public ByteBuffer pageDataBuffer() { + pageData.rewind(); + return pageData; } @@ -87,10 +113,26 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ } /** {@inheritDoc} */ + @Override public int groupId() { + return fullPageId.groupId(); + } + + /** + * @return PageSize without encryption overhead. + */ + public int realPageSize() { + return realPageSize; + } + + /** {@inheritDoc} */ @Override public String toString() { - ByteBuffer buf = ByteBuffer.allocateDirect(pageData.length); - buf.order(ByteOrder.nativeOrder()); - buf.put(pageData); + ByteBuffer buf = pageData; + + if (!pageData.isDirect()) { + buf = ByteBuffer.allocateDirect(pageDataSize()); + buf.order(ByteOrder.nativeOrder()); + buf.put(pageData); + } long addr = GridUnsafe.bufferAddress(buf); @@ -101,16 +143,12 @@ public class PageSnapshot extends WALRecord implements WalRecordCacheGroupAware{ + super.toString() + "]]"; } catch (IgniteCheckedException ignored) { - return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() + - ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]"; + return "Error during call 'toString' of PageSnapshot [fullPageId=" + fullPageId() + + ", pageData = " + Arrays.toString(pageData()) + ", super=" + super.toString() + "]"; } finally { - GridUnsafe.cleanDirectBuffer(buf); + if (!pageData.isDirect()) + GridUnsafe.cleanDirectBuffer(buf); } } - - /** {@inheritDoc} */ - @Override public int groupId() { - return fullPageId.groupId(); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java index a050cd0..cac0064 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java @@ -44,13 +44,19 @@ public class CacheCompressionManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { + if (cctx.kernalContext().clientNode()) { + diskPageCompression = DiskPageCompression.DISABLED; + + return; + } + compressProc = cctx.kernalContext().compress(); CacheConfiguration cfg = cctx.config(); diskPageCompression = cctx.kernalContext().config().isClientMode() ? null : cfg.getDiskPageCompression(); - if (diskPageCompression != null) { + if (diskPageCompression != DiskPageCompression.DISABLED) { if (!cctx.dataRegion().config().isPersistenceEnabled()) throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence."); @@ -81,7 +87,7 @@ public class CacheCompressionManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException If failed. */ public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException { - if (diskPageCompression == null) + if (diskPageCompression == DiskPageCompression.DISABLED) return page; int blockSize = store.getBlockSize(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index d73b3da..a10877d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -151,6 +151,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp; @@ -2529,6 +2530,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData()); + + if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) { + int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId()); + + assert pageSnapshotRecord.pageDataSize() < realPageSize : pageSnapshotRecord.pageDataSize(); + + cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize); + } } finally { pageMem.writeUnlock(grpId, pageId, page, null, true, true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index ed7dd2f..23adeac 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -346,6 +346,10 @@ public class FilePageStore implements PageStore { if (inited) { long newSize = Math.max(pageSize, fileIO.size() - headerSize()); + // In the case of compressed pages we can miss the tail of the page. + if (newSize % pageSize != 0) + newSize += pageSize - newSize % pageSize; + long delta = newSize - allocated.getAndSet(newSize); assert delta % pageSize == 0 : delta; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index ed2d344..da787fb 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.stat.IoStatisticsHolder; import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp; @@ -967,7 +968,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (snapshot.fullPageId().equals(fullId)) { if (tmpAddr == null) { - assert snapshot.pageData().length <= pageSize() : snapshot.pageData().length; + assert snapshot.pageDataSize() <= pageSize() : snapshot.pageDataSize(); tmpAddr = GridUnsafe.allocateMemory(pageSize()); } @@ -976,6 +977,14 @@ public class PageMemoryImpl implements PageMemoryEx { curPage = wrapPointer(tmpAddr, pageSize()); PageUtils.putBytes(tmpAddr, 0, snapshot.pageData()); + + if (PageIO.getCompressionType(tmpAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) { + int realPageSize = realPageSize(snapshot.groupId()); + + assert snapshot.pageDataSize() < realPageSize : snapshot.pageDataSize(); + + ctx.kernalContext().compress().decompressPage(curPage, realPageSize); + } } break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java index 0e66b77..ab660d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java @@ -405,6 +405,14 @@ public abstract class PageIO { } /** + * @param pageAddr Page address. + * @return Compression type. + */ + public static byte getCompressionType(long pageAddr) { + return PageUtils.getByte(pageAddr, COMPRESSION_TYPE_OFF); + } + + /** * @param page Page buffer. * @param compressedSize Compressed size. */ @@ -421,6 +429,14 @@ public abstract class PageIO { } /** + * @param pageAddr Page address. + * @return Compressed size. + */ + public static short getCompressedSize(long pageAddr) { + return PageUtils.getShort(pageAddr, COMPRESSED_SIZE_OFF); + } + + /** * @param page Page buffer. * @param compactedSize Compacted size. */ @@ -438,6 +454,14 @@ public abstract class PageIO { /** * @param pageAddr Page address. + * @return Compacted size. + */ + public static short getCompactedSize(long pageAddr) { + return PageUtils.getShort(pageAddr, COMPACTED_SIZE_OFF); + } + + /** + * @param pageAddr Page address. * @return Checksum. */ public static int getCrc(long pageAddr) { @@ -839,9 +863,8 @@ public abstract class PageIO { assert pageSize <= out.remaining(); assert pageSize == page.remaining(); - page.mark(); - out.put(page).flip(); - page.reset(); + PageHandler.copyMemory(page, 0, out, 0, pageSize); + out.limit(pageSize); } /** @@ -857,7 +880,14 @@ public abstract class PageIO { .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr))) .a("\n],\n"); - io.printPage(addr, pageSize, sb); + if (getCompressionType(addr) != 0) { + sb.a("CompressedPage[\n\tcompressionType=").a(getCompressionType(addr)) + .a(",\n\tcompressedSize=").a(getCompressedSize(addr)) + .a(",\n\tcompactedSize=").a(getCompactedSize(addr)) + .a("\n]"); + } + else + io.printPage(addr, pageSize, sb); return sb.toString(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 0e45c26..7bf2e10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -55,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.events.WalSegmentArchivedEvent; @@ -103,9 +104,11 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.io.GridFileUtils; @@ -357,6 +360,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Switch segment record offset. */ private final AtomicLongArray switchSegmentRecordOffset; + /** Page snapshot records compression algorithm. */ + private DiskPageCompression pageCompression; + + /** Page snapshot records compression level. */ + private int pageCompressionLevel; + /** * @param ctx Kernal context. */ @@ -471,6 +480,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentRouter, ioFactory ); + + pageCompression = dsCfg.getWalPageCompression(); + + if (pageCompression != DiskPageCompression.DISABLED) { + if (serializerVer < 2) { + throw new IgniteCheckedException("WAL page snapshots compression not supported for serializerVer=" + + serializerVer); + } + + cctx.kernalContext().compress().checkPageCompressionSupported(); + + pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != null ? + CompressionProcessor.checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) : + CompressionProcessor.getDefaultCompressionLevel(pageCompression); + } } } @@ -769,6 +793,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (currWrHandle == null || (isDisable != null && isDisable.check())) return null; + // Do page snapshots compression if configured. + if (pageCompression != DiskPageCompression.DISABLED && rec instanceof PageSnapshot) { + PageSnapshot pageSnapshot = (PageSnapshot)rec; + + int pageSize = pageSnapshot.realPageSize(); + + ByteBuffer pageData = pageSnapshot.pageDataBuffer(); + + ByteBuffer compressedPage = cctx.kernalContext().compress().compressPage(pageData, pageSize, 1, + pageCompression, pageCompressionLevel); + + if (compressedPage != pageData) { + assert compressedPage.isDirect() : "Is direct buffer: " + compressedPage.isDirect(); + + rec = new PageSnapshot(pageSnapshot.fullPageId(), GridUnsafe.bufferAddress(compressedPage), + compressedPage.limit(), pageSize); + } + } + // Need to calculate record size first. rec.size(serializer.size(rec)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java index e94be9f..e1d7c9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataSerializer.java @@ -41,11 +41,12 @@ public interface RecordDataSerializer { * * @param type Record type. * @param in Buffer to read. + * @param size Record size (0 if unknown). * @return WAL record. * @throws IOException In case of I/O problems. * @throws IgniteCheckedException If it's unable to read record. */ - WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in) throws IOException, IgniteCheckedException; + WALRecord readRecord(WALRecord.RecordType type, ByteBufferBackedDataInput in, int size) throws IOException, IgniteCheckedException; /** * Writes record data to buffer {@code buf}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index c5ede10..dd6393f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -127,10 +127,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer { protected final GridCacheSharedContext cctx; /** Size of page used for PageMemory regions. */ - private final int pageSize; + protected final int pageSize; /** Size of page without encryption overhead. */ - private final int realPageSize; + protected final int realPageSize; /** Cache object processor to reading {@link DataEntry DataEntries}. */ protected final IgniteCacheObjectProcessor co; @@ -184,7 +184,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { } /** {@inheritDoc} */ - @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in) + @Override public WALRecord readRecord(RecordType type, ByteBufferBackedDataInput in, int size) throws IOException, IgniteCheckedException { if (type == ENCRYPTED_RECORD) { if (encSpi == null) { @@ -201,10 +201,10 @@ public class RecordDataV1Serializer implements RecordDataSerializer { if (clData.get1() == null) return new EncryptedRecord(clData.get2(), clData.get3()); - return readPlainRecord(clData.get3(), clData.get1(), true); + return readPlainRecord(clData.get3(), clData.get1(), true, clData.get1().buffer().capacity()); } - return readPlainRecord(type, in, false); + return readPlainRecord(type, in, false, size); } /** {@inheritDoc} */ @@ -346,7 +346,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { PageSnapshot pageRec = (PageSnapshot)record; - return pageRec.pageData().length + 12; + return pageRec.pageDataSize() + 12; case CHECKPOINT_RECORD: CheckpointRecord cpRec = (CheckpointRecord)record; @@ -532,12 +532,13 @@ public class RecordDataV1Serializer implements RecordDataSerializer { * @param type Record type. * @param in Input * @param encrypted Record was encrypted. + * @param recordSize Record size. * @return Deserialized record. * @throws IOException If failed. * @throws IgniteCheckedException If failed. */ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, - boolean encrypted) throws IOException, IgniteCheckedException { + boolean encrypted, int recordSize) throws IOException, IgniteCheckedException { WALRecord res; switch (type) { @@ -1160,7 +1161,7 @@ public class RecordDataV1Serializer implements RecordDataSerializer { buf.putInt(snap.fullPageId().groupId()); buf.putLong(snap.fullPageId().pageId()); - buf.put(snap.pageData()); + buf.put(snap.pageDataBuffer()); break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java index 8533e73..52714c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV2Serializer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.wal.record.CacheState; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.ConsistentCutRecord; @@ -36,6 +37,7 @@ import org.apache.ignite.internal.pagemem.wal.record.LazyMvccDataEntry; import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry; import org.apache.ignite.internal.pagemem.wal.record.MvccDataRecord; import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.SnapshotRecord; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; @@ -121,9 +123,20 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer { @Override WALRecord readPlainRecord( RecordType type, ByteBufferBackedDataInput in, - boolean encrypted + boolean encrypted, + int recordSize ) throws IOException, IgniteCheckedException { switch (type) { + case PAGE_RECORD: + int cacheId = in.readInt(); + long pageId = in.readLong(); + + byte[] arr = new byte[recordSize - 4 /* cacheId */ - 8 /* pageId */]; + + in.readFully(arr); + + return new PageSnapshot(new FullPageId(pageId, cacheId), arr, encrypted ? realPageSize : pageSize); + case CHECKPOINT_RECORD: long msb = in.readLong(); long lsb = in.readLong(); @@ -200,7 +213,7 @@ public class RecordDataV2Serializer extends RecordDataV1Serializer { return new ConsistentCutRecord(); default: - return super.readPlainRecord(type, in, encrypted); + return super.readPlainRecord(type, in, encrypted, recordSize); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java index 288be5a..ec0ddc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java @@ -139,7 +139,7 @@ public class RecordV1Serializer implements RecordSerializer { if (recType == null) throw new IOException("Unknown record type: " + recType); - final WALRecord rec = dataSerializer.readRecord(recType, in); + final WALRecord rec = dataSerializer.readRecord(recType, in, 0); rec.position(ptr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java index 872e47d..5a6db67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV2Serializer.java @@ -159,7 +159,8 @@ public class RecordV2Serializer implements RecordSerializer { return new MarshalledRecord(recType, ptr, buf); } else { - WALRecord rec = dataSerializer.readRecord(recType, in); + WALRecord rec = dataSerializer.readRecord(recType, in, ptr.length() - REC_TYPE_SIZE - + FILE_WAL_POINTER_SIZE - CRC_SIZE); rec.position(ptr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java index 4f2227e..6c14d6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java @@ -50,7 +50,7 @@ public class CompressionProcessor extends GridProcessorAdapter { public static final int ZSTD_DEFAULT_LEVEL = 3; /** */ - protected static final byte UNCOMPRESSED_PAGE = 0; + public static final byte UNCOMPRESSED_PAGE = 0; /** */ protected static final byte COMPACTED_PAGE = 1; @@ -133,6 +133,15 @@ public class CompressionProcessor extends GridProcessorAdapter { } /** + * Checks weither page compression is supported. + * + * @throws IgniteCheckedException If compression is not supported. + */ + public void checkPageCompressionSupported() throws IgniteCheckedException { + fail(); + } + + /** * @param storagePath Storage path. * @param pageSize Page size. * @throws IgniteCheckedException If compression is not supported. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 167571d..2f74ac9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -60,6 +60,7 @@ import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.MemoryPolicyConfiguration; @@ -1922,6 +1923,11 @@ public class PlatformConfigurationUtils { if (in.readBoolean()) res.setCheckpointReadLockTimeout(in.readLong()); + res.setWalPageCompression(DiskPageCompression.fromOrdinal(in.readInt())); + + if (in.readBoolean()) + res.setWalPageCompressionLevel(in.readInt()); + int cnt = in.readInt(); if (cnt > 0) { @@ -2058,6 +2064,15 @@ public class PlatformConfigurationUtils { else w.writeBoolean(false); + w.writeInt(cfg.getWalPageCompression().ordinal()); + + if (cfg.getWalPageCompressionLevel() != null) { + w.writeBoolean(true); + w.writeInt(cfg.getWalPageCompressionLevel()); + } + else + w.writeBoolean(false); + if (cfg.getDataRegionConfigurations() != null) { w.writeInt(cfg.getDataRegionConfigurations().length); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java index f0f1d8a..05ba134 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridStringBuilder; @@ -27,6 +28,9 @@ import org.apache.ignite.internal.util.GridStringBuilder; */ public class DummyPageIO extends PageIO implements CompactablePageIO { /** */ + public static final IOVersions<DummyPageIO> VERSIONS = new IOVersions<>(new DummyPageIO()); + + /** */ public DummyPageIO() { super(2 * Short.MAX_VALUE, 1); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java index c44cbd0..5921c51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.pagemem.FullPageId; @@ -211,7 +212,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom mem = shared.database().dataRegion(null).pageMemory(); - verifyReads(res.get1(), mem, res.get2(), shared.wal()); + verifyReads(ig.context(), res.get1(), mem, res.get2(), shared.wal()); } /** @@ -679,7 +680,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); try { - DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId())); + DummyPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId())); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -700,6 +701,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom * @param mem Memory. */ private void verifyReads( + GridKernalContext ctx, Map<FullPageId, Integer> res, PageMemory mem, WALPointer start, @@ -707,6 +709,8 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom ) throws Exception { Map<FullPageId, byte[]> replay = new HashMap<>(); + ByteBuffer buf = ByteBuffer.allocateDirect(mem.pageSize()).order(ByteOrder.nativeOrder()); + try (PartitionMetaStateRecordExcludeIterator it = new PartitionMetaStateRecordExcludeIterator(wal.replay(start))) { IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); @@ -729,7 +733,22 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom else if (rec instanceof PageSnapshot) { PageSnapshot page = (PageSnapshot)rec; - replay.put(page.fullPageId(), page.pageData()); + int realPageSize = mem.realPageSize(page.groupId()); + + byte[] pageData = page.pageData(); + + if (page.pageDataSize() < realPageSize) { + buf.clear(); + buf.put(pageData).flip(); + + ctx.compress().decompressPage(buf, realPageSize); + + pageData = new byte[buf.limit()]; + + buf.get(pageData); + } + + replay.put(page.fullPageId(), pageData); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java index 729f9ac..427e80c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal; import java.io.File; import java.io.FilenameFilter; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; import org.apache.ignite.Ignite; @@ -37,9 +38,11 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.RolloverType; +import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -172,9 +175,11 @@ public class WalCompactionTest extends GridCommonAbstractTest { cache.put(i, val); } + byte[] dummyPage = dummyPage(pageSize); + // Spam WAL to move all data records to compressible WAL zone. for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) { - ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize], + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage, pageSize)); } @@ -479,9 +484,11 @@ public class WalCompactionTest extends GridCommonAbstractTest { cache.put(i, val); } + byte[] dummyPage = dummyPage(pageSize); + // Spam WAL to move all data records to compressible WAL zone. for (int i = 0; i < WAL_SEGMENT_SIZE / pageSize * 2; i++) { - ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), new byte[pageSize], + ig.context().cache().context().wal().log(new PageSnapshot(new FullPageId(-1, -1), dummyPage, pageSize)); } @@ -564,4 +571,19 @@ public class WalCompactionTest extends GridCommonAbstractTest { assertFalse(fail); } + + /** + * @param pageSize Page size. + */ + private static byte[] dummyPage(int pageSize) { + ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize); + + DummyPageIO.VERSIONS.latest().initNewPage(GridUnsafe.bufferAddress(pageBuf), -1, pageSize); + + byte[] pageData = new byte[pageSize]; + + pageBuf.get(pageData); + + return pageData; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java index 560d707..b013d24 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALPointer; @@ -424,7 +423,7 @@ public class PageMemoryTracker implements IgnitePlugin { page.lock(); try { - PageUtils.putBytes(page.address(), 0, snapshot.pageData()); + GridUnsafe.copyMemory(GridUnsafe.bufferAddress(snapshot.pageDataBuffer()), page.address(), pageSize); page.changeHistory().clear(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java index 18df698..e1de9fd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java @@ -54,6 +54,7 @@ import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.DiscoverySpiTestListener; @@ -72,6 +73,7 @@ import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -82,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp; @@ -162,7 +166,9 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { /** */ private long checkpointFrequency = DFLT_CHECKPOINT_FREQ; - ; + + /** WAL page snapshots records compression method. */ + protected DiskPageCompression walPageCompression; /** {@inheritDoc} */ @Override protected boolean isMultiJvm() { @@ -213,6 +219,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { dbCfg.setWalSegments(walSegments); + dbCfg.setWalPageCompression(walPageCompression); + dbCfg.setCheckpointFrequency(checkpointFrequency); cfg.setDataStorageConfiguration(dbCfg); @@ -1357,6 +1365,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { ByteBuffer buf = ByteBuffer.allocateDirect(pageSize); + buf.order(ByteOrder.nativeOrder()); + // Now check that deltas can be correctly applied. try (WALIterator it = sharedCtx.wal().replay(ptr)) { while (it.hasNext()) { @@ -1367,7 +1377,28 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { if (rec instanceof PageSnapshot) { PageSnapshot page = (PageSnapshot)rec; - rolledPages.put(page.fullPageId(), page.pageData()); + CacheGroupContext gctx = sharedCtx.cache().cacheGroup(page.groupId()); + + int realPageSize = gctx == null ? pageSize + : gctx.dataRegion().pageMemory().realPageSize(page.groupId()); + + byte[] pageData = page.pageData(); + + if (pageData.length < realPageSize) { + buf.clear(); + buf.put(pageData); + buf.flip(); + + sharedCtx.kernalContext().compress().decompressPage(buf, realPageSize); + + pageData = new byte[realPageSize]; + + buf.position(0); + + buf.get(pageData); + } + + rolledPages.put(page.fullPageId(), pageData); } else if (rec instanceof PageDeltaRecord) { PageDeltaRecord delta = (PageDeltaRecord)rec; @@ -1384,17 +1415,13 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData); - buf.order(ByteOrder.nativeOrder()); - - buf.position(0); + buf.clear(); buf.put(pageData); - buf.position(0); + buf.flip(); delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(), GridUnsafe.bufferAddress(buf)); - buf.position(0); - buf.get(pageData); } } @@ -1404,6 +1431,8 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory(); + ByteBuffer bufWal = ByteBuffer.allocateDirect(pageSize); + for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) { FullPageId fullId = entry.getKey(); @@ -1419,12 +1448,30 @@ public class IgniteWalRecoveryTest extends GridCommonAbstractTest { try { byte[] data = entry.getValue(); - for (int i = 0; i < data.length; i++) { - if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize())) - continue; // Skip tracking pages. + if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize())) + continue; // Skip tracking pages. - assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]); + // Compaction/restoring page can left some trash in unused space, so we need to compare + // compacted pages in case of compaction is used. + if (walPageCompression != null && PageIO.getPageIO(bufPtr) instanceof CompactablePageIO) { + CompactablePageIO pageIO = PageIO.getPageIO(bufPtr); + + buf.clear(); + bufWal.clear(); + + int realPageSize = data.length; + + pageIO.compactPage(GridUnsafe.wrapPointer(bufPtr, realPageSize), buf, realPageSize); + pageIO.compactPage(ByteBuffer.wrap(data), bufWal, realPageSize); + + bufPtr = GridUnsafe.bufferAddress(buf); + data = new byte[bufWal.limit()]; + bufWal.rewind(); + bufWal.get(data); } + + for (int i = 0; i < data.length; i++) + assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]); } finally { pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml index 6c6d69e..bf6715f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml @@ -149,7 +149,8 @@ walThreadLocalBufferSize="11" walArchivePath="abc" walFlushFrequency="00:00:12" walFsyncDelayNanos="13" walHistorySize="14" walMode="Background" walRecordIteratorBufferSize="15" walSegments="16" walSegmentSize="17" - walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18"> + walPath="wal-store" writeThrottlingEnabled="true" walAutoArchiveAfterInactivity="00:00:18" + walPageCompression="Zstd"> <dataRegionConfigurations> <dataRegionConfiguration emptyPagesPoolSize="1" evictionThreshold="2" initialSize="3" metricsEnabled="true" maxSize="4" name="reg2" pageEvictionMode="RandomLru" metricsRateTimeInterval="00:00:01" diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs index 0135e80..4961954 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs @@ -323,6 +323,7 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual("wal-store", ds.WalPath); Assert.AreEqual(TimeSpan.FromSeconds(18), ds.WalAutoArchiveAfterInactivity); Assert.IsTrue(ds.WriteThrottlingEnabled); + Assert.AreEqual(DiskPageCompression.Zstd, ds.WalPageCompression); var dr = ds.DataRegionConfigurations.Single(); Assert.AreEqual(1, dr.EmptyPagesPoolSize); @@ -986,6 +987,8 @@ namespace Apache.Ignite.Core.Tests ConcurrencyLevel = 1, PageSize = 5 * 1024, WalAutoArchiveAfterInactivity = TimeSpan.FromSeconds(19), + WalPageCompression = DiskPageCompression.Lz4, + WalPageCompressionLevel = 10, DefaultDataRegionConfiguration = new DataRegionConfiguration { Name = "reg1", diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index ff96dc4..b99f2a8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -63,6 +63,7 @@ <Compile Include="Common\IgniteProductVersion.cs" /> <Compile Include="Configuration\CheckpointWriteOrder.cs" /> <Compile Include="Configuration\DataPageEvictionMode.cs" /> + <Compile Include="Configuration\DiskPageCompression.cs" /> <Compile Include="Configuration\DataRegionConfiguration.cs" /> <Compile Include="Configuration\DataStorageConfiguration.cs" /> <Compile Include="Cache\Configuration\MemoryPolicyConfiguration.cs" /> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs index d5cfe85..2ca4108 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DataStorageConfiguration.cs @@ -169,6 +169,11 @@ namespace Apache.Ignite.Core.Configuration public const long DefaultMaxWalArchiveSize = 1024 * 1024 * 1024; /// <summary> + /// Default value for <see cref="WalPageCompression"/>. + /// </summary> + public const DiskPageCompression DefaultWalPageCompression = DiskPageCompression.Disabled; + + /// <summary> /// Initializes a new instance of the <see cref="DataStorageConfiguration"/> class. /// </summary> public DataStorageConfiguration() @@ -196,6 +201,7 @@ namespace Apache.Ignite.Core.Configuration PageSize = DefaultPageSize; WalAutoArchiveAfterInactivity = DefaultWalAutoArchiveAfterInactivity; MaxWalArchiveSize = DefaultMaxWalArchiveSize; + WalPageCompression = DefaultWalPageCompression; } /// <summary> @@ -236,6 +242,8 @@ namespace Apache.Ignite.Core.Configuration ConcurrencyLevel = reader.ReadInt(); WalAutoArchiveAfterInactivity = reader.ReadLongAsTimespan(); CheckpointReadLockTimeout = reader.ReadTimeSpanNullable(); + WalPageCompression = (DiskPageCompression)reader.ReadInt(); + WalPageCompressionLevel = reader.ReadIntNullable(); var count = reader.ReadInt(); @@ -290,6 +298,8 @@ namespace Apache.Ignite.Core.Configuration writer.WriteInt(ConcurrencyLevel); writer.WriteTimeSpanAsLong(WalAutoArchiveAfterInactivity); writer.WriteTimeSpanAsLongNullable(CheckpointReadLockTimeout); + writer.WriteInt((int)WalPageCompression); + writer.WriteIntNullable(WalPageCompressionLevel); if (DataRegionConfigurations != null) { @@ -498,6 +508,16 @@ namespace Apache.Ignite.Core.Configuration public TimeSpan? CheckpointReadLockTimeout { get; set; } /// <summary> + /// Gets or sets the compression algorithm for WAL page snapshot records. + /// </summary> + public DiskPageCompression WalPageCompression { get; set; } + + /// <summary> + /// Gets or sets the compression level for WAL page snapshot records. + /// </summary> + public int? WalPageCompressionLevel { get; set; } + + /// <summary> /// Gets or sets the data region configurations. /// </summary> [SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")] @@ -508,4 +528,4 @@ namespace Apache.Ignite.Core.Configuration /// </summary> public DataRegionConfiguration DefaultDataRegionConfiguration { get; set; } } -} \ No newline at end of file +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs new file mode 100644 index 0000000..133ecd1 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/DiskPageCompression.cs @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Configuration +{ + /// <summary> + /// Disk page compression options. + /// </summary> + public enum DiskPageCompression + { + /// <summary> + /// Compression disabled. + /// </summary> + Disabled, + + /// <summary> + /// Retain only useful data from half-filled pages, but do not apply any compression. + /// </summary> + SkipGarbage, + + /// <summary> + /// Zstd compression. + /// </summary> + Zstd, + + /// <summary> + /// Lz4 compression. + /// </summary> + Lz4, + + /// <summary> + /// Snappy compression. + /// </summary> + Snappy + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index c23cf90..9179048 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -117,6 +117,16 @@ </xs:restriction> </xs:simpleType> + <xs:simpleType name="diskPageCompression" final="restriction"> + <xs:restriction base="xs:string"> + <xs:enumeration value="Disabled" /> + <xs:enumeration value="SkipGarbage" /> + <xs:enumeration value="Zstd" /> + <xs:enumeration value="Lz4" /> + <xs:enumeration value="Snappy" /> + </xs:restriction> + </xs:simpleType> + <xs:element name="igniteConfiguration"> <xs:annotation> <xs:documentation>Ignite configuration root.</xs:documentation> @@ -1948,6 +1958,16 @@ </xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="walPageCompression" type="diskPageCompression"> + <xs:annotation> + <xs:documentation>Compression algorithm for WAL page snapshot records.</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute name="walPageCompressionLevel" type="xs:int"> + <xs:annotation> + <xs:documentation>Compression level for WAL page snapshot records.</xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> <xs:element name="sslContextFactory" minOccurs="0">
