This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0024e37136c IGNITE-21886 Refactor CompressionProcessorImpl, move code
partially to ignite-core module - Fixes #11290.
0024e37136c is described below
commit 0024e37136c6da493da09830f1fe3c729753952d
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Apr 5 10:03:08 2024 +0300
IGNITE-21886 Refactor CompressionProcessorImpl, move code partially to
ignite-core module - Fixes #11290.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../compress/CompressionProcessorImpl.java | 229 ++++-----------------
.../persistence/wal/FileWriteAheadLogManager.java | 10 +-
.../processors/compress/CompressionHandler.java | 4 +-
.../processors/compress/CompressionProcessor.java | 208 ++++++++++++++++++-
.../wal/WalPageRecordCompactionTest.java | 86 ++++++++
.../ignite/testsuites/IgnitePdsTestSuite5.java | 2 +
6 files changed, 330 insertions(+), 209 deletions(-)
diff --git
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
index 8fbfc44949a..831511b1b08 100644
---
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
+++
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -30,9 +30,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
-import org.apache.ignite.internal.pagemem.PageUtils;
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
-import
org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -41,16 +39,12 @@ import org.xerial.snappy.Snappy;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import static
org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
-import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
/**
* Compression processor.
*/
public class CompressionProcessorImpl extends CompressionProcessor {
- /** Max page size. */
- private final ThreadLocalDirectByteBuffer compactBuf = new
ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
-
/** A bit more than max page size, extra space is required by compressors.
*/
private final ThreadLocalDirectByteBuffer compressBuf =
new
ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE),
NATIVE_BYTE_ORDER);
@@ -92,84 +86,6 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
checkPunchHole(storagePath, fsBlockSize);
}
- /** {@inheritDoc} */
- @Override public ByteBuffer compressPage(
- ByteBuffer page,
- int pageSize,
- int blockSize,
- DiskPageCompression compression,
- int compressLevel
- ) throws IgniteCheckedException {
- assert compression != null && compression !=
DiskPageCompression.DISABLED : compression;
- assert U.isPow2(blockSize) : blockSize;
- assert page.position() == 0 && page.limit() >= pageSize;
-
- int oldPageLimit = page.limit();
-
- try {
- // Page size will be less than page limit when TDE is enabled. To
make compaction and compression work
- // correctly we need to set limit to real page size.
- page.limit(pageSize);
-
- ByteBuffer compactPage = doCompactPage(page, pageSize);
-
- int compactSize = compactPage.limit();
-
- assert compactSize <= pageSize : compactSize;
-
- // If no need to compress further or configured just to skip
garbage.
- if (compactSize < blockSize || compression == SKIP_GARBAGE)
- return setCompactionInfo(compactPage, compactSize);
-
- ByteBuffer compressedPage = doCompressPage(compression,
compactPage, compactSize, compressLevel);
-
- assert compressedPage.position() == 0;
- int compressedSize = compressedPage.limit();
-
- int freeCompactBlocks = (pageSize - compactSize) / blockSize;
- int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
-
- if (freeCompactBlocks >= freeCompressedBlocks) {
- if (freeCompactBlocks == 0)
- return page; // No blocks will be released.
-
- return setCompactionInfo(compactPage, compactSize);
- }
-
- return setCompressionInfo(compressedPage, compression,
compressedSize, compactSize);
- }
- finally {
- page.limit(oldPageLimit);
- }
- }
-
- /**
- * @param page Page buffer.
- * @param pageSize Page size.
- * @return Compacted page buffer.
- */
- private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws
IgniteCheckedException {
- PageIO io = PageIO.getPageIO(page);
-
- ByteBuffer compactPage = compactBuf.get();
-
- if (io instanceof CompactablePageIO) {
- // Drop the garbage from the page.
- ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
- }
- else {
- // Direct buffer is required as output of this method.
- if (page.isDirect())
- return page;
-
- PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0,
page.array());
-
- compactPage.limit(pageSize);
- }
-
- return compactPage;
- }
-
/** Check if filesystem actually supports punching holes. */
private void checkPunchHole(Path storagePath, int fsBlockSz) throws
IgniteException {
ByteBuffer buf = null;
@@ -198,33 +114,6 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
}
}
- /**
- * @param page Page.
- * @param compactSize Compacted page size.
- * @return The given page.
- */
- private static ByteBuffer setCompactionInfo(ByteBuffer page, int
compactSize) {
- return setCompressionInfo(page, SKIP_GARBAGE, compactSize,
compactSize);
- }
-
- /**
- * @param page Page.
- * @param compression Compression algorithm.
- * @param compressedSize Compressed size.
- * @param compactedSize Compact size.
- * @return The given page.
- */
- private static ByteBuffer setCompressionInfo(ByteBuffer page,
DiskPageCompression compression, int compressedSize, int compactedSize) {
- assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE :
compressedSize;
- assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE :
compactedSize;
-
- PageIO.setCompressionType(page, getCompressionType(compression));
- PageIO.setCompressedSize(page, (short)compressedSize);
- PageIO.setCompactedSize(page, (short)compactedSize);
-
- return page;
- }
-
/**
* @param compression Compression algorithm.
* @param compactPage Compacted page.
@@ -232,7 +121,12 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
* @param compressLevel Compression level.
* @return Compressed page.
*/
- private ByteBuffer doCompressPage(DiskPageCompression compression,
ByteBuffer compactPage, int compactSize, int compressLevel) {
+ @Override protected ByteBuffer doCompressPage(
+ DiskPageCompression compression,
+ ByteBuffer compactPage,
+ int compactSize,
+ int compressLevel
+ ) {
switch (compression) {
case ZSTD:
return compressPageZstd(compactPage, compactSize,
compressLevel);
@@ -319,99 +213,46 @@ public class CompressionProcessorImpl extends
CompressionProcessor {
compactPage.limit(compactSize);
}
- /**
- * @param compression Compression.
- * @return Level.
- */
- private static byte getCompressionType(DiskPageCompression compression) {
- if (compression == DiskPageCompression.DISABLED)
- return UNCOMPRESSED_PAGE;
-
- switch (compression) {
- case ZSTD:
- return ZSTD_COMPRESSED_PAGE;
-
- case LZ4:
- return LZ4_COMPRESSED_PAGE;
-
- case SNAPPY:
- return SNAPPY_COMPRESSED_PAGE;
-
- case SKIP_GARBAGE:
- return COMPACTED_PAGE;
- }
- throw new IllegalStateException("Unexpected compression: " +
compression);
- }
-
/** {@inheritDoc} */
- @Override public void decompressPage(ByteBuffer page, int pageSize) throws
IgniteCheckedException {
- assert page.capacity() >= pageSize : "capacity=" + page.capacity() +
", pageSize=" + pageSize;
-
- byte compressType = PageIO.getCompressionType(page);
-
- if (compressType == UNCOMPRESSED_PAGE)
- return; // Nothing to do.
+ @Override protected void doDecompressPage(int compressType, ByteBuffer
page, int compressedSize, int compactSize) {
+ ByteBuffer dst = compressBuf.get();
- short compressedSize = PageIO.getCompressedSize(page);
- short compactSize = PageIO.getCompactedSize(page);
+ // Position on a part that needs to be decompressed.
+ page.limit(compressedSize)
+ .position(PageIO.COMMON_HEADER_END);
- assert compactSize <= pageSize && compactSize >= compressedSize;
+ // LZ4 needs this limit to be exact.
+ dst.limit(compactSize - PageIO.COMMON_HEADER_END);
- if (compressType == COMPACTED_PAGE) {
- // Just setup bounds before restoring the page.
- page.position(0).limit(compactSize);
- }
- else {
- ByteBuffer dst = compressBuf.get();
-
- // Position on a part that needs to be decompressed.
- page.limit(compressedSize)
- .position(PageIO.COMMON_HEADER_END);
-
- // LZ4 needs this limit to be exact.
- dst.limit(compactSize - PageIO.COMMON_HEADER_END);
-
- switch (compressType) {
- case ZSTD_COMPRESSED_PAGE:
- Zstd.decompress(dst, page);
- dst.flip();
-
- break;
+ switch (compressType) {
+ case ZSTD_COMPRESSED_PAGE:
+ Zstd.decompress(dst, page);
+ dst.flip();
- case LZ4_COMPRESSED_PAGE:
- Lz4.decompress(page, dst);
- dst.flip();
+ break;
- break;
+ case LZ4_COMPRESSED_PAGE:
+ Lz4.decompress(page, dst);
+ dst.flip();
- case SNAPPY_COMPRESSED_PAGE:
- try {
- Snappy.uncompress(page, dst);
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- break;
-
- default:
- throw new IgniteException("Unknown compression: " +
compressType);
- }
-
- page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
- page.put(dst).flip();
- assert page.limit() == compactSize;
- }
+ break;
- PageIO io = PageIO.getPageIO(page);
+ case SNAPPY_COMPRESSED_PAGE:
+ try {
+ Snappy.uncompress(page, dst);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ break;
- if (io instanceof CompactablePageIO)
- ((CompactablePageIO)io).restorePage(page, pageSize);
- else {
- assert compactSize == pageSize
- : "Wrong compacted page size [compactSize=" + compactSize + ",
pageSize=" + pageSize + ']';
+ default:
+ throw new IgniteException("Unknown compression: " +
compressType);
}
- setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
+ page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
+ page.put(dst).flip();
+ assert page.limit() == compactSize;
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 12c40e81081..2c3de5fa326 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -161,8 +161,7 @@ import static
org.apache.ignite.internal.processors.cache.persistence.wal.serial
import static
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
-import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
-import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
+import static
org.apache.ignite.internal.processors.compress.CompressionProcessor.getCompressionLevel;
import static
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty;
import static
org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
@@ -605,11 +604,10 @@ public class FileWriteAheadLogManager extends
GridCacheSharedManagerAdapter impl
serializerVer);
}
-
cctx.kernalContext().compress().checkPageCompressionSupported();
+ if (pageCompression != DiskPageCompression.SKIP_GARBAGE)
+
cctx.kernalContext().compress().checkPageCompressionSupported();
- pageCompressionLevel = dsCfg.getWalPageCompressionLevel() !=
null ?
-
checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(),
pageCompression) :
- getDefaultCompressionLevel(pageCompression);
+ pageCompressionLevel =
getCompressionLevel(dsCfg.getWalPageCompressionLevel(), pageCompression);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
index a1dd5146aad..faa1eb7e0b6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
@@ -115,9 +115,7 @@ public class CompressionHandler {
CompressionProcessor comprProc = ctx.compress();
- int lvl = cfg.getDiskPageCompressionLevel() == null ?
- CompressionProcessor.getDefaultCompressionLevel(diskPageCompr) :
-
CompressionProcessor.checkCompressionLevelBounds(cfg.getDiskPageCompressionLevel(),
diskPageCompr);
+ int lvl =
CompressionProcessor.getCompressionLevel(cfg.getDiskPageCompressionLevel(),
diskPageCompr);
File dbPath =
ctx.pdsFolderResolver().resolveFolders().persistentStoreRootPath();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
index fe760712161..051b883b1fc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
@@ -23,8 +23,17 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
+import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import
org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static
org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
/**
* Compression processor.
@@ -65,6 +74,9 @@ public class CompressionProcessor extends
GridProcessorAdapter {
/** */
protected static final byte SNAPPY_COMPRESSED_PAGE = 4;
+ /** Max page size. */
+ private final ThreadLocalDirectByteBuffer compactBuf = new
ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
+
/**
* @param ctx Kernal context.
*/
@@ -72,6 +84,16 @@ public class CompressionProcessor extends
GridProcessorAdapter {
super(ctx);
}
+ /**
+ * @param compressLevel Compression level.
+ * @param compression Compression algorithm.
+ * @return Compression level.
+ */
+ public static int getCompressionLevel(Integer compressLevel,
DiskPageCompression compression) {
+ return compressLevel != null ?
checkCompressionLevelBounds(compressLevel, compression) :
+ getDefaultCompressionLevel(compression);
+ }
+
/**
* @param compression Compression algorithm.
* @return Default compression level.
@@ -86,6 +108,7 @@ public class CompressionProcessor extends
GridProcessorAdapter {
case SNAPPY:
case SKIP_GARBAGE:
+ case DISABLED:
return 0;
}
@@ -134,7 +157,7 @@ public class CompressionProcessor extends
GridProcessorAdapter {
}
/**
- * Checks weither page compression is supported.
+ * Checks weither page compression can be used for page file storage.
*
* @throws IgniteCheckedException If compression is not supported.
*/
@@ -143,6 +166,8 @@ public class CompressionProcessor extends
GridProcessorAdapter {
}
/**
+ * Checks weither page file storage supports compression.
+ *
* @param storagePath Storage path.
* @param pageSize Page size.
* @throws IgniteCheckedException If compression is not supported.
@@ -151,10 +176,66 @@ public class CompressionProcessor extends
GridProcessorAdapter {
fail();
}
+ /**
+ * @param page Page.
+ * @param compactSize Compacted page size.
+ * @return The given page.
+ */
+ protected static ByteBuffer setCompactionInfo(ByteBuffer page, int
compactSize) {
+ return setCompressionInfo(page, SKIP_GARBAGE, compactSize,
compactSize);
+ }
+
+ /**
+ * @param page Page.
+ * @param compression Compression algorithm.
+ * @param compressedSize Compressed size.
+ * @param compactedSize Compact size.
+ * @return The given page.
+ */
+ protected static ByteBuffer setCompressionInfo(
+ ByteBuffer page,
+ DiskPageCompression compression,
+ int compressedSize,
+ int compactedSize
+ ) {
+ assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE :
compressedSize;
+ assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE :
compactedSize;
+
+ PageIO.setCompressionType(page, getCompressionType(compression));
+ PageIO.setCompressedSize(page, (short)compressedSize);
+ PageIO.setCompactedSize(page, (short)compactedSize);
+
+ return page;
+ }
+
+ /**
+ * @param compression Compression.
+ * @return Level.
+ */
+ private static byte getCompressionType(DiskPageCompression compression) {
+ if (compression == DiskPageCompression.DISABLED)
+ return UNCOMPRESSED_PAGE;
+
+ switch (compression) {
+ case ZSTD:
+ return ZSTD_COMPRESSED_PAGE;
+
+ case LZ4:
+ return LZ4_COMPRESSED_PAGE;
+
+ case SNAPPY:
+ return SNAPPY_COMPRESSED_PAGE;
+
+ case SKIP_GARBAGE:
+ return COMPACTED_PAGE;
+ }
+ throw new IllegalStateException("Unexpected compression: " +
compression);
+ }
+
/**
* @param page Page buffer.
* @param pageSize Page size.
- * @param storeBlockSize Store block size.
+ * @param blockSize Store block size.
* @param compression Compression algorithm.
* @param compressLevel Compression level.
* @return Possibly compressed buffer.
@@ -163,11 +244,94 @@ public class CompressionProcessor extends
GridProcessorAdapter {
public ByteBuffer compressPage(
ByteBuffer page,
int pageSize,
- int storeBlockSize,
+ int blockSize,
DiskPageCompression compression,
int compressLevel
) throws IgniteCheckedException {
- return fail();
+ assert compression != null && compression !=
DiskPageCompression.DISABLED : compression;
+ assert U.isPow2(blockSize) : blockSize;
+ assert page.position() == 0 && page.limit() >= pageSize;
+
+ int oldPageLimit = page.limit();
+
+ try {
+ // Page size will be less than page limit when TDE is enabled. To
make compaction and compression work
+ // correctly we need to set limit to real page size.
+ page.limit(pageSize);
+
+ ByteBuffer compactPage = doCompactPage(page, pageSize);
+
+ int compactSize = compactPage.limit();
+
+ assert compactSize <= pageSize : compactSize;
+
+ // If no need to compress further or configured just to skip
garbage.
+ if (compactSize < blockSize || compression == SKIP_GARBAGE)
+ return setCompactionInfo(compactPage, compactSize);
+
+ ByteBuffer compressedPage = doCompressPage(compression,
compactPage, compactSize, compressLevel);
+
+ assert compressedPage.position() == 0;
+ int compressedSize = compressedPage.limit();
+
+ int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+ int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+ if (freeCompactBlocks >= freeCompressedBlocks) {
+ if (freeCompactBlocks == 0)
+ return page; // No blocks will be released.
+
+ return setCompactionInfo(compactPage, compactSize);
+ }
+
+ return setCompressionInfo(compressedPage, compression,
compressedSize, compactSize);
+ }
+ finally {
+ page.limit(oldPageLimit);
+ }
+ }
+
+ /**
+ * @param page Page buffer.
+ * @param pageSize Page size.
+ * @return Compacted page buffer.
+ */
+ protected ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws
IgniteCheckedException {
+ PageIO io = PageIO.getPageIO(page);
+
+ ByteBuffer compactPage = compactBuf.get();
+
+ if (io instanceof CompactablePageIO) {
+ // Drop the garbage from the page.
+ ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+ }
+ else {
+ // Direct buffer is required as output of this method.
+ if (page.isDirect())
+ return page;
+
+ PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0,
page.array());
+
+ compactPage.limit(pageSize);
+ }
+
+ return compactPage;
+ }
+
+ /**
+ * @param compression Compression algorithm.
+ * @param compactPage Compacted page.
+ * @param compactSize Compacted page size.
+ * @param compressLevel Compression level.
+ * @return Compressed page.
+ */
+ protected ByteBuffer doCompressPage(
+ DiskPageCompression compression,
+ ByteBuffer compactPage,
+ int compactSize,
+ int compressLevel
+ ) {
+ throw new IllegalStateException("Unsupported compression: " +
compression);
}
/**
@@ -176,7 +340,39 @@ public class CompressionProcessor extends
GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public void decompressPage(ByteBuffer page, int pageSize) throws
IgniteCheckedException {
- if (PageIO.getCompressionType(page) != UNCOMPRESSED_PAGE)
- fail();
+ assert page.capacity() >= pageSize : "capacity=" + page.capacity() +
", pageSize=" + pageSize;
+
+ byte compressType = PageIO.getCompressionType(page);
+
+ if (compressType == UNCOMPRESSED_PAGE)
+ return; // Nothing to do.
+
+ short compressedSize = PageIO.getCompressedSize(page);
+ short compactSize = PageIO.getCompactedSize(page);
+
+ assert compactSize <= pageSize && compactSize >= compressedSize;
+
+ if (compressType == COMPACTED_PAGE) {
+ // Just setup bounds before restoring the page.
+ page.position(0).limit(compactSize);
+ }
+ else
+ doDecompressPage(compressType, page, compressedSize, compactSize);
+
+ PageIO io = PageIO.getPageIO(page);
+
+ if (io instanceof CompactablePageIO)
+ ((CompactablePageIO)io).restorePage(page, pageSize);
+ else {
+ assert compactSize == pageSize
+ : "Wrong compacted page size [compactSize=" + compactSize + ",
pageSize=" + pageSize + ']';
+ }
+
+ setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
+ }
+
+ /** */
+ protected void doDecompressPage(int compressType, ByteBuffer page, int
compressedSize, int compactSize) {
+ throw new IllegalStateException("Unsupported compression: " +
compressType);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
new file mode 100644
index 00000000000..10bc4f3a5a6
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import
org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX;
+
+/**
+ * Test SKIP_GARBAGE compression mode for WAL page snapshot records without
extra dependencies.
+ */
+public class WalPageRecordCompactionTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteName) throws Exception {
+ return
super.getConfiguration(igniteName).setDataStorageConfiguration(new
DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(true))
+ .setMetricsEnabled(true)
+ .setWalPageCompression(DiskPageCompression.SKIP_GARBAGE));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testWalPageCompaction() throws Exception {
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ int pageSize =
ignite.configuration().getDataStorageConfiguration().getPageSize();
+
+ MetricRegistry registry =
ignite.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
+
+ LongMetric loggingRate = registry.findMetric("WalLoggingRate");
+ LongMetric walSize = registry.findMetric("WalWrittenBytes");
+
+ ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize);
+ long pageAddr = GridUnsafe.bufferAddress(pageBuf);
+ DataPageIO.VERSIONS.latest().initNewPage(pageAddr, -1, pageSize, null);
+ PageSnapshot pageSnapshot = new PageSnapshot(new FullPageId(-1, -1),
pageAddr, pageSize, pageSize);
+
+ long prevCnt;
+ long size;
+
+ do {
+ prevCnt = loggingRate.value();
+ long prevSize = walSize.value();
+
+ ignite.context().cache().context().wal().log(pageSnapshot);
+
+ size = walSize.value() - prevSize;
+ } while (loggingRate.value() - prevCnt > 1); // Ensure that no more
than one record logged.
+
+ // Check that record is compacted.
+ assertTrue("Unexpected WAL record size: " + size, size > 0 && size <
pageSize);
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index c79381ad237..bc2ececd1db 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDel
import
org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithRestartsTest;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.WalPageRecordCompactionTest;
import
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -123,6 +124,7 @@ public class IgnitePdsTestSuite5 {
GridTestUtils.addTestIfNeeded(suite,
IgniteCacheDatabaseSharedManagerSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
WalCompactionNotificationsTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
WalPageRecordCompactionTest.class, ignoredTests);
return suite;
}