This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0024e37136c IGNITE-21886 Refactor CompressionProcessorImpl, move code 
partially to ignite-core module - Fixes #11290.
0024e37136c is described below

commit 0024e37136c6da493da09830f1fe3c729753952d
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Apr 5 10:03:08 2024 +0300

    IGNITE-21886 Refactor CompressionProcessorImpl, move code partially to 
ignite-core module - Fixes #11290.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../compress/CompressionProcessorImpl.java         | 229 ++++-----------------
 .../persistence/wal/FileWriteAheadLogManager.java  |  10 +-
 .../processors/compress/CompressionHandler.java    |   4 +-
 .../processors/compress/CompressionProcessor.java  | 208 ++++++++++++++++++-
 .../wal/WalPageRecordCompactionTest.java           |  86 ++++++++
 .../ignite/testsuites/IgnitePdsTestSuite5.java     |   2 +
 6 files changed, 330 insertions(+), 209 deletions(-)

diff --git 
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
 
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
index 8fbfc44949a..831511b1b08 100644
--- 
a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
+++ 
b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java
@@ -30,9 +30,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
-import org.apache.ignite.internal.pagemem.PageUtils;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
-import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -41,16 +39,12 @@ import org.xerial.snappy.Snappy;
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
-import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
 import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
 
 /**
  * Compression processor.
  */
 public class CompressionProcessorImpl extends CompressionProcessor {
-    /** Max page size. */
-    private final ThreadLocalDirectByteBuffer compactBuf = new 
ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
-
     /** A bit more than max page size, extra space is required by compressors. 
*/
     private final ThreadLocalDirectByteBuffer compressBuf =
         new 
ThreadLocalDirectByteBuffer(maxCompressedBufferSize(MAX_PAGE_SIZE), 
NATIVE_BYTE_ORDER);
@@ -92,84 +86,6 @@ public class CompressionProcessorImpl extends 
CompressionProcessor {
         checkPunchHole(storagePath, fsBlockSize);
     }
 
-    /** {@inheritDoc} */
-    @Override public ByteBuffer compressPage(
-        ByteBuffer page,
-        int pageSize,
-        int blockSize,
-        DiskPageCompression compression,
-        int compressLevel
-    ) throws IgniteCheckedException {
-        assert compression != null && compression != 
DiskPageCompression.DISABLED : compression;
-        assert U.isPow2(blockSize) : blockSize;
-        assert page.position() == 0 && page.limit() >= pageSize;
-
-        int oldPageLimit = page.limit();
-
-        try {
-            // Page size will be less than page limit when TDE is enabled. To 
make compaction and compression work
-            // correctly we need to set limit to real page size.
-            page.limit(pageSize);
-
-            ByteBuffer compactPage = doCompactPage(page, pageSize);
-
-            int compactSize = compactPage.limit();
-
-            assert compactSize <= pageSize : compactSize;
-
-            // If no need to compress further or configured just to skip 
garbage.
-            if (compactSize < blockSize || compression == SKIP_GARBAGE)
-                return setCompactionInfo(compactPage, compactSize);
-
-            ByteBuffer compressedPage = doCompressPage(compression, 
compactPage, compactSize, compressLevel);
-
-            assert compressedPage.position() == 0;
-            int compressedSize = compressedPage.limit();
-
-            int freeCompactBlocks = (pageSize - compactSize) / blockSize;
-            int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
-
-            if (freeCompactBlocks >= freeCompressedBlocks) {
-                if (freeCompactBlocks == 0)
-                    return page; // No blocks will be released.
-
-                return setCompactionInfo(compactPage, compactSize);
-            }
-
-            return setCompressionInfo(compressedPage, compression, 
compressedSize, compactSize);
-        }
-        finally {
-            page.limit(oldPageLimit);
-        }
-    }
-
-    /**
-     * @param page Page buffer.
-     * @param pageSize Page size.
-     * @return Compacted page buffer.
-     */
-    private ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws 
IgniteCheckedException {
-        PageIO io = PageIO.getPageIO(page);
-
-        ByteBuffer compactPage = compactBuf.get();
-
-        if (io instanceof CompactablePageIO) {
-            // Drop the garbage from the page.
-            ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
-        }
-        else {
-            // Direct buffer is required as output of this method.
-            if (page.isDirect())
-                return page;
-
-            PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, 
page.array());
-
-            compactPage.limit(pageSize);
-        }
-
-        return compactPage;
-    }
-
     /** Check if filesystem actually supports punching holes. */
     private void checkPunchHole(Path storagePath, int fsBlockSz) throws 
IgniteException {
         ByteBuffer buf = null;
@@ -198,33 +114,6 @@ public class CompressionProcessorImpl extends 
CompressionProcessor {
         }
     }
 
-    /**
-     * @param page Page.
-     * @param compactSize Compacted page size.
-     * @return The given page.
-     */
-    private static ByteBuffer setCompactionInfo(ByteBuffer page, int 
compactSize) {
-        return setCompressionInfo(page, SKIP_GARBAGE, compactSize, 
compactSize);
-    }
-
-    /**
-     * @param page Page.
-     * @param compression Compression algorithm.
-     * @param compressedSize Compressed size.
-     * @param compactedSize Compact size.
-     * @return The given page.
-     */
-    private static ByteBuffer setCompressionInfo(ByteBuffer page, 
DiskPageCompression compression, int compressedSize, int compactedSize) {
-        assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : 
compressedSize;
-        assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : 
compactedSize;
-
-        PageIO.setCompressionType(page, getCompressionType(compression));
-        PageIO.setCompressedSize(page, (short)compressedSize);
-        PageIO.setCompactedSize(page, (short)compactedSize);
-
-        return page;
-    }
-
     /**
      * @param compression Compression algorithm.
      * @param compactPage Compacted page.
@@ -232,7 +121,12 @@ public class CompressionProcessorImpl extends 
CompressionProcessor {
      * @param compressLevel Compression level.
      * @return Compressed page.
      */
-    private ByteBuffer doCompressPage(DiskPageCompression compression, 
ByteBuffer compactPage, int compactSize, int compressLevel) {
+    @Override protected ByteBuffer doCompressPage(
+        DiskPageCompression compression,
+        ByteBuffer compactPage,
+        int compactSize,
+        int compressLevel
+    ) {
         switch (compression) {
             case ZSTD:
                 return compressPageZstd(compactPage, compactSize, 
compressLevel);
@@ -319,99 +213,46 @@ public class CompressionProcessorImpl extends 
CompressionProcessor {
         compactPage.limit(compactSize);
     }
 
-    /**
-     * @param compression Compression.
-     * @return Level.
-     */
-    private static byte getCompressionType(DiskPageCompression compression) {
-        if (compression == DiskPageCompression.DISABLED)
-            return UNCOMPRESSED_PAGE;
-
-        switch (compression) {
-            case ZSTD:
-                return ZSTD_COMPRESSED_PAGE;
-
-            case LZ4:
-                return LZ4_COMPRESSED_PAGE;
-
-            case SNAPPY:
-                return SNAPPY_COMPRESSED_PAGE;
-
-            case SKIP_GARBAGE:
-                return COMPACTED_PAGE;
-        }
-        throw new IllegalStateException("Unexpected compression: " + 
compression);
-    }
-
     /** {@inheritDoc} */
-    @Override public void decompressPage(ByteBuffer page, int pageSize) throws 
IgniteCheckedException {
-        assert page.capacity() >= pageSize : "capacity=" + page.capacity() + 
", pageSize=" + pageSize;
-
-        byte compressType = PageIO.getCompressionType(page);
-
-        if (compressType == UNCOMPRESSED_PAGE)
-            return; // Nothing to do.
+    @Override protected void doDecompressPage(int compressType, ByteBuffer 
page, int compressedSize, int compactSize) {
+        ByteBuffer dst = compressBuf.get();
 
-        short compressedSize = PageIO.getCompressedSize(page);
-        short compactSize = PageIO.getCompactedSize(page);
+        // Position on a part that needs to be decompressed.
+        page.limit(compressedSize)
+            .position(PageIO.COMMON_HEADER_END);
 
-        assert compactSize <= pageSize && compactSize >= compressedSize;
+        // LZ4 needs this limit to be exact.
+        dst.limit(compactSize - PageIO.COMMON_HEADER_END);
 
-        if (compressType == COMPACTED_PAGE) {
-            // Just setup bounds before restoring the page.
-            page.position(0).limit(compactSize);
-        }
-        else {
-            ByteBuffer dst = compressBuf.get();
-
-            // Position on a part that needs to be decompressed.
-            page.limit(compressedSize)
-                .position(PageIO.COMMON_HEADER_END);
-
-            // LZ4 needs this limit to be exact.
-            dst.limit(compactSize - PageIO.COMMON_HEADER_END);
-
-            switch (compressType) {
-                case ZSTD_COMPRESSED_PAGE:
-                    Zstd.decompress(dst, page);
-                    dst.flip();
-
-                    break;
+        switch (compressType) {
+            case ZSTD_COMPRESSED_PAGE:
+                Zstd.decompress(dst, page);
+                dst.flip();
 
-                case LZ4_COMPRESSED_PAGE:
-                    Lz4.decompress(page, dst);
-                    dst.flip();
+                break;
 
-                    break;
+            case LZ4_COMPRESSED_PAGE:
+                Lz4.decompress(page, dst);
+                dst.flip();
 
-                case SNAPPY_COMPRESSED_PAGE:
-                    try {
-                        Snappy.uncompress(page, dst);
-                    }
-                    catch (IOException e) {
-                        throw new IgniteException(e);
-                    }
-                    break;
-
-                default:
-                    throw new IgniteException("Unknown compression: " + 
compressType);
-            }
-
-            page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
-            page.put(dst).flip();
-            assert page.limit() == compactSize;
-        }
+                break;
 
-        PageIO io = PageIO.getPageIO(page);
+            case SNAPPY_COMPRESSED_PAGE:
+                try {
+                    Snappy.uncompress(page, dst);
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+                break;
 
-        if (io instanceof CompactablePageIO)
-            ((CompactablePageIO)io).restorePage(page, pageSize);
-        else {
-            assert compactSize == pageSize
-                : "Wrong compacted page size [compactSize=" + compactSize + ", 
pageSize=" + pageSize + ']';
+            default:
+                throw new IgniteException("Unknown compression: " + 
compressType);
         }
 
-        setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
+        page.position(PageIO.COMMON_HEADER_END).limit(compactSize);
+        page.put(dst).flip();
+        assert page.limit() == compactSize;
     }
 
     /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 12c40e81081..2c3de5fa326 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -161,8 +161,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serial
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
-import static 
org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
-import static 
org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
+import static 
org.apache.ignite.internal.processors.compress.CompressionProcessor.getCompressionLevel;
 import static 
org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty;
 import static 
org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
 
@@ -605,11 +604,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         serializerVer);
                 }
 
-                
cctx.kernalContext().compress().checkPageCompressionSupported();
+                if (pageCompression != DiskPageCompression.SKIP_GARBAGE)
+                    
cctx.kernalContext().compress().checkPageCompressionSupported();
 
-                pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != 
null ?
-                    
checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), 
pageCompression) :
-                    getDefaultCompressionLevel(pageCompression);
+                pageCompressionLevel = 
getCompressionLevel(dsCfg.getWalPageCompressionLevel(), pageCompression);
             }
         }
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
index a1dd5146aad..faa1eb7e0b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java
@@ -115,9 +115,7 @@ public class CompressionHandler {
 
         CompressionProcessor comprProc = ctx.compress();
 
-        int lvl = cfg.getDiskPageCompressionLevel() == null ?
-            CompressionProcessor.getDefaultCompressionLevel(diskPageCompr) :
-            
CompressionProcessor.checkCompressionLevelBounds(cfg.getDiskPageCompressionLevel(),
 diskPageCompr);
+        int lvl = 
CompressionProcessor.getCompressionLevel(cfg.getDiskPageCompressionLevel(), 
diskPageCompr);
 
         File dbPath = 
ctx.pdsFolderResolver().resolveFolders().persistentStoreRootPath();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
index fe760712161..051b883b1fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessor.java
@@ -23,8 +23,17 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.DiskPageCompression;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteComponentType;
+import org.apache.ignite.internal.ThreadLocalDirectByteBuffer;
+import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE;
+import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE;
+import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER;
 
 /**
  * Compression processor.
@@ -65,6 +74,9 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
     /** */
     protected static final byte SNAPPY_COMPRESSED_PAGE = 4;
 
+    /** Max page size. */
+    private final ThreadLocalDirectByteBuffer compactBuf = new 
ThreadLocalDirectByteBuffer(MAX_PAGE_SIZE, NATIVE_BYTE_ORDER);
+
     /**
      * @param ctx Kernal context.
      */
@@ -72,6 +84,16 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
         super(ctx);
     }
 
+    /**
+     * @param compressLevel Compression level.
+     * @param compression Compression algorithm.
+     * @return Compression level.
+     */
+    public static int getCompressionLevel(Integer compressLevel, 
DiskPageCompression compression) {
+        return compressLevel != null ? 
checkCompressionLevelBounds(compressLevel, compression) :
+            getDefaultCompressionLevel(compression);
+    }
+
     /**
      * @param compression Compression algorithm.
      * @return Default compression level.
@@ -86,6 +108,7 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
 
             case SNAPPY:
             case SKIP_GARBAGE:
+            case DISABLED:
                 return 0;
         }
 
@@ -134,7 +157,7 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Checks weither page compression is supported.
+     * Checks weither page compression can be used for page file storage.
      *
      * @throws IgniteCheckedException If compression is not supported.
      */
@@ -143,6 +166,8 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * Checks weither page file storage supports compression.
+     *
      * @param storagePath Storage path.
      * @param pageSize Page size.
      * @throws IgniteCheckedException If compression is not supported.
@@ -151,10 +176,66 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
         fail();
     }
 
+    /**
+     * @param page Page.
+     * @param compactSize Compacted page size.
+     * @return The given page.
+     */
+    protected static ByteBuffer setCompactionInfo(ByteBuffer page, int 
compactSize) {
+        return setCompressionInfo(page, SKIP_GARBAGE, compactSize, 
compactSize);
+    }
+
+    /**
+     * @param page Page.
+     * @param compression Compression algorithm.
+     * @param compressedSize Compressed size.
+     * @param compactedSize Compact size.
+     * @return The given page.
+     */
+    protected static ByteBuffer setCompressionInfo(
+        ByteBuffer page,
+        DiskPageCompression compression,
+        int compressedSize,
+        int compactedSize
+    ) {
+        assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE : 
compressedSize;
+        assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE : 
compactedSize;
+
+        PageIO.setCompressionType(page, getCompressionType(compression));
+        PageIO.setCompressedSize(page, (short)compressedSize);
+        PageIO.setCompactedSize(page, (short)compactedSize);
+
+        return page;
+    }
+
+    /**
+     * @param compression Compression.
+     * @return Level.
+     */
+    private static byte getCompressionType(DiskPageCompression compression) {
+        if (compression == DiskPageCompression.DISABLED)
+            return UNCOMPRESSED_PAGE;
+
+        switch (compression) {
+            case ZSTD:
+                return ZSTD_COMPRESSED_PAGE;
+
+            case LZ4:
+                return LZ4_COMPRESSED_PAGE;
+
+            case SNAPPY:
+                return SNAPPY_COMPRESSED_PAGE;
+
+            case SKIP_GARBAGE:
+                return COMPACTED_PAGE;
+        }
+        throw new IllegalStateException("Unexpected compression: " + 
compression);
+    }
+
     /**
      * @param page Page buffer.
      * @param pageSize Page size.
-     * @param storeBlockSize Store block size.
+     * @param blockSize Store block size.
      * @param compression Compression algorithm.
      * @param compressLevel Compression level.
      * @return Possibly compressed buffer.
@@ -163,11 +244,94 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
     public ByteBuffer compressPage(
         ByteBuffer page,
         int pageSize,
-        int storeBlockSize,
+        int blockSize,
         DiskPageCompression compression,
         int compressLevel
     ) throws IgniteCheckedException {
-        return fail();
+        assert compression != null && compression != 
DiskPageCompression.DISABLED : compression;
+        assert U.isPow2(blockSize) : blockSize;
+        assert page.position() == 0 && page.limit() >= pageSize;
+
+        int oldPageLimit = page.limit();
+
+        try {
+            // Page size will be less than page limit when TDE is enabled. To 
make compaction and compression work
+            // correctly we need to set limit to real page size.
+            page.limit(pageSize);
+
+            ByteBuffer compactPage = doCompactPage(page, pageSize);
+
+            int compactSize = compactPage.limit();
+
+            assert compactSize <= pageSize : compactSize;
+
+            // If no need to compress further or configured just to skip 
garbage.
+            if (compactSize < blockSize || compression == SKIP_GARBAGE)
+                return setCompactionInfo(compactPage, compactSize);
+
+            ByteBuffer compressedPage = doCompressPage(compression, 
compactPage, compactSize, compressLevel);
+
+            assert compressedPage.position() == 0;
+            int compressedSize = compressedPage.limit();
+
+            int freeCompactBlocks = (pageSize - compactSize) / blockSize;
+            int freeCompressedBlocks = (pageSize - compressedSize) / blockSize;
+
+            if (freeCompactBlocks >= freeCompressedBlocks) {
+                if (freeCompactBlocks == 0)
+                    return page; // No blocks will be released.
+
+                return setCompactionInfo(compactPage, compactSize);
+            }
+
+            return setCompressionInfo(compressedPage, compression, 
compressedSize, compactSize);
+        }
+        finally {
+            page.limit(oldPageLimit);
+        }
+    }
+
+    /**
+     * @param page Page buffer.
+     * @param pageSize Page size.
+     * @return Compacted page buffer.
+     */
+    protected ByteBuffer doCompactPage(ByteBuffer page, int pageSize) throws 
IgniteCheckedException {
+        PageIO io = PageIO.getPageIO(page);
+
+        ByteBuffer compactPage = compactBuf.get();
+
+        if (io instanceof CompactablePageIO) {
+            // Drop the garbage from the page.
+            ((CompactablePageIO)io).compactPage(page, compactPage, pageSize);
+        }
+        else {
+            // Direct buffer is required as output of this method.
+            if (page.isDirect())
+                return page;
+
+            PageUtils.putBytes(GridUnsafe.bufferAddress(compactPage), 0, 
page.array());
+
+            compactPage.limit(pageSize);
+        }
+
+        return compactPage;
+    }
+
+    /**
+     * @param compression Compression algorithm.
+     * @param compactPage Compacted page.
+     * @param compactSize Compacted page size.
+     * @param compressLevel Compression level.
+     * @return Compressed page.
+     */
+    protected ByteBuffer doCompressPage(
+        DiskPageCompression compression,
+        ByteBuffer compactPage,
+        int compactSize,
+        int compressLevel
+    ) {
+        throw new IllegalStateException("Unsupported compression: " + 
compression);
     }
 
     /**
@@ -176,7 +340,39 @@ public class CompressionProcessor extends 
GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     public void decompressPage(ByteBuffer page, int pageSize) throws 
IgniteCheckedException {
-        if (PageIO.getCompressionType(page) != UNCOMPRESSED_PAGE)
-            fail();
+        assert page.capacity() >= pageSize : "capacity=" + page.capacity() + 
", pageSize=" + pageSize;
+
+        byte compressType = PageIO.getCompressionType(page);
+
+        if (compressType == UNCOMPRESSED_PAGE)
+            return; // Nothing to do.
+
+        short compressedSize = PageIO.getCompressedSize(page);
+        short compactSize = PageIO.getCompactedSize(page);
+
+        assert compactSize <= pageSize && compactSize >= compressedSize;
+
+        if (compressType == COMPACTED_PAGE) {
+            // Just setup bounds before restoring the page.
+            page.position(0).limit(compactSize);
+        }
+        else
+            doDecompressPage(compressType, page, compressedSize, compactSize);
+
+        PageIO io = PageIO.getPageIO(page);
+
+        if (io instanceof CompactablePageIO)
+            ((CompactablePageIO)io).restorePage(page, pageSize);
+        else {
+            assert compactSize == pageSize
+                : "Wrong compacted page size [compactSize=" + compactSize + ", 
pageSize=" + pageSize + ']';
+        }
+
+        setCompressionInfo(page, DiskPageCompression.DISABLED, 0, 0);
+    }
+
+    /** */
+    protected void doDecompressPage(int compressType, ByteBuffer page, int 
compressedSize, int compactSize) {
+        throw new IllegalStateException("Unsupported compression: " + 
compressType);
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
new file mode 100644
index 00000000000..10bc4f3a5a6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalPageRecordCompactionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.DiskPageCompression;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX;
+
+/**
+ * Test SKIP_GARBAGE compression mode for WAL page snapshot records without 
extra dependencies.
+ */
+public class WalPageRecordCompactionTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteName) throws Exception {
+        return 
super.getConfiguration(igniteName).setDataStorageConfiguration(new 
DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true))
+            .setMetricsEnabled(true)
+            .setWalPageCompression(DiskPageCompression.SKIP_GARBAGE));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testWalPageCompaction() throws Exception {
+        IgniteEx ignite = startGrid(0);
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        int pageSize = 
ignite.configuration().getDataStorageConfiguration().getPageSize();
+
+        MetricRegistry registry = 
ignite.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
+
+        LongMetric loggingRate = registry.findMetric("WalLoggingRate");
+        LongMetric walSize = registry.findMetric("WalWrittenBytes");
+
+        ByteBuffer pageBuf = ByteBuffer.allocateDirect(pageSize);
+        long pageAddr = GridUnsafe.bufferAddress(pageBuf);
+        DataPageIO.VERSIONS.latest().initNewPage(pageAddr, -1, pageSize, null);
+        PageSnapshot pageSnapshot = new PageSnapshot(new FullPageId(-1, -1), 
pageAddr, pageSize, pageSize);
+
+        long prevCnt;
+        long size;
+
+        do {
+            prevCnt = loggingRate.value();
+            long prevSize = walSize.value();
+
+            ignite.context().cache().context().wal().log(pageSnapshot);
+
+            size = walSize.value() - prevSize;
+        } while (loggingRate.value() - prevCnt > 1); // Ensure that no more 
than one record logged.
+
+        // Check that record is compacted.
+        assertTrue("Unexpected WAL record size: " + size, size > 0 && size < 
pageSize);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
index c79381ad237..bc2ececd1db 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite5.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDel
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithRestartsTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalPageRecordCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -123,6 +124,7 @@ public class IgnitePdsTestSuite5 {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteCacheDatabaseSharedManagerSelfTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
WalCompactionNotificationsTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
WalPageRecordCompactionTest.class, ignoredTests);
 
         return suite;
     }

Reply via email to