http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-LICENSE.txt ---------------------------------------------------------------------- diff --git a/modules/compress/licenses/lz4-LICENSE.txt b/modules/compress/licenses/lz4-LICENSE.txt new file mode 100644 index 0000000..c221aeb --- /dev/null +++ b/modules/compress/licenses/lz4-LICENSE.txt @@ -0,0 +1,11 @@ +This repository uses 2 different licenses : +- all files in the `lib` directory use a BSD 2-Clause license +- all other files use a GPLv2 license, unless explicitly stated otherwise + +Relevant license is reminded at the top of each source file, +and with presence of COPYING or LICENSE file in associated directories. + +This model is selected to emphasize that +files in the `lib` directory are designed to be included into 3rd party applications, +while all other files, in `programs`, `tests` or `examples`, +receive more limited attention and support for such scenario.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/licenses/lz4-java-LICENSE.txt ---------------------------------------------------------------------- diff --git a/modules/compress/licenses/lz4-java-LICENSE.txt b/modules/compress/licenses/lz4-java-LICENSE.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/compress/licenses/lz4-java-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/pom.xml ---------------------------------------------------------------------- diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml new file mode 100644 index 0000000..876a121 --- /dev/null +++ b/modules/compress/pom.xml @@ -0,0 +1,113 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-compress</artifactId> + <version>2.7.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.github.jnr</groupId> + <artifactId>jnr-posix</artifactId> + <version>${jnr.posix.version}</version> + </dependency> + + <dependency> + <groupId>com.github.luben</groupId> + <artifactId>zstd-jni</artifactId> + <version>${zstd.version}</version> + </dependency> + + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>${lz4.version}</version> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.thoughtworks.xstream</groupId> + <artifactId>xstream</artifactId> + <version>1.4.8</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java new file mode 100644 index 0000000..2553371 --- /dev/null +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.xerial.snappy.Snappy; + +import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; +import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; +import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER; + +/** + * Compression processor. + */ +public class CompressionProcessorImpl extends CompressionProcessor { + /** Max page size. */ + private final ThreadLocalByteBuffer compactBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE); + + /** A bit more than max page size. */ + private final ThreadLocalByteBuffer compressBuf = new ThreadLocalByteBuffer(MAX_PAGE_SIZE + 1024); + + /** + * @param ctx Kernal context. + */ + @SuppressWarnings("WeakerAccess") + public CompressionProcessorImpl(GridKernalContext ctx) { + super(ctx); + } + + /** + * @param cap Capacity. + * @return Direct byte buffer. + */ + static ByteBuffer allocateDirectBuffer(int cap) { + return ByteBuffer.allocateDirect(cap).order(NATIVE_BYTE_ORDER); + } + + /** {@inheritDoc} */ + @Override public void checkPageCompressionSupported(Path storagePath, int pageSize) throws IgniteCheckedException { + if (!U.isLinux()) + throw new IgniteCheckedException("Currently page compression is supported only for Linux."); + + FileSystemUtils.checkSupported(); + + int fsBlockSize = FileSystemUtils.getFileSystemBlockSize(storagePath); + + if (fsBlockSize <= 0) + throw new IgniteCheckedException("Failed to get file system block size: " + storagePath); + + if (!U.isPow2(fsBlockSize)) + throw new IgniteCheckedException("Storage block size must be power of 2: " + fsBlockSize); + + if (pageSize < fsBlockSize * 2) { + throw new IgniteCheckedException("Page size (now configured to " + pageSize + " bytes) " + + "must be at least 2 times larger than the underlying storage block size (detected to be " + fsBlockSize + + " bytes at '" + storagePath + "') for page compression."); + } + } + + /** {@inheritDoc} */ + @Override public ByteBuffer compressPage( + ByteBuffer page, + int pageSize, + int blockSize, + DiskPageCompression compression, + int compressLevel + ) throws IgniteCheckedException { + assert compression != null; + assert U.isPow2(pageSize): pageSize; + assert U.isPow2(blockSize): blockSize; + assert page.position() == 0 && page.limit() == pageSize; + + PageIO io = PageIO.getPageIO(page); + + if (!(io instanceof CompactablePageIO)) + return page; + + ByteBuffer compactPage = compactBuf.get(); + + // Drop the garbage from the page. + ((CompactablePageIO)io).compactPage(page, compactPage, pageSize); + page.clear(); + + int compactSize = compactPage.limit(); + + assert compactSize <= pageSize: compactSize; + + // If no need to compress further or configured just to skip garbage. + if (compactSize < blockSize || compression == SKIP_GARBAGE) + return setCompactionInfo(compactPage, compactSize); + + ByteBuffer compressedPage = doCompressPage(compression, compactPage, compactSize, compressLevel); + + assert compressedPage.position() == 0; + int compressedSize = compressedPage.limit(); + + int freeCompactBlocks = (pageSize - compactSize) / blockSize; + int freeCompressedBlocks = (pageSize - compressedSize) / blockSize; + + if (freeCompactBlocks >= freeCompressedBlocks) { + if (freeCompactBlocks == 0) + return page; // No blocks will be released. + + return setCompactionInfo(compactPage, compactSize); + } + + return setCompressionInfo(compressedPage, compression, compressedSize, compactSize); + } + + /** + * @param page Page. + * @param compactSize Compacted page size. + * @return The given page. + */ + private static ByteBuffer setCompactionInfo(ByteBuffer page, int compactSize) { + return setCompressionInfo(page, SKIP_GARBAGE, compactSize, compactSize); + } + + /** + * @param page Page. + * @param compression Compression algorithm. + * @param compressedSize Compressed size. + * @param compactedSize Compact size. + * @return The given page. + */ + private static ByteBuffer setCompressionInfo(ByteBuffer page, DiskPageCompression compression, int compressedSize, int compactedSize) { + assert compressedSize >= 0 && compressedSize <= Short.MAX_VALUE: compressedSize; + assert compactedSize >= 0 && compactedSize <= Short.MAX_VALUE: compactedSize; + + PageIO.setCompressionType(page, getCompressionType(compression)); + PageIO.setCompressedSize(page, (short)compressedSize); + PageIO.setCompactedSize(page, (short)compactedSize); + + return page; + } + + /** + * @param compression Compression algorithm. + * @param compactPage Compacted page. + * @param compactSize Compacted page size. + * @param compressLevel Compression level. + * @return Compressed page. + */ + private ByteBuffer doCompressPage(DiskPageCompression compression, ByteBuffer compactPage, int compactSize, int compressLevel) { + switch (compression) { + case ZSTD: + return compressPageZstd(compactPage, compactSize, compressLevel); + + case LZ4: + return compressPageLz4(compactPage, compactSize, compressLevel); + + case SNAPPY: + return compressPageSnappy(compactPage, compactSize); + } + throw new IllegalStateException("Unsupported compression: " + compression); + } + + /** + * @param compactPage Compacted page. + * @param compactSize Compacted page size. + * @param compressLevel Compression level. + * @return Compressed page. + */ + private ByteBuffer compressPageLz4(ByteBuffer compactPage, int compactSize, int compressLevel) { + LZ4Compressor compressor = Lz4.getCompressor(compressLevel); + + ByteBuffer compressedPage = compressBuf.get(); + + copyPageHeader(compactPage, compressedPage, compactSize); + compressor.compress(compactPage, compressedPage); + + compactPage.flip(); + compressedPage.flip(); + + return compressedPage; + } + + /** + * @param compactPage Compacted page. + * @param compactSize Compacted page size. + * @param compressLevel Compression level. + * @return Compressed page. + */ + private ByteBuffer compressPageZstd(ByteBuffer compactPage, int compactSize, int compressLevel) { + ByteBuffer compressedPage = compressBuf.get(); + + copyPageHeader(compactPage, compressedPage, compactSize); + Zstd.compress(compressedPage, compactPage, compressLevel); + + compactPage.flip(); + compressedPage.flip(); + + return compressedPage; + } + + /** + * @param compactPage Compacted page. + * @param compactSize Compacted page size. + * @return Compressed page. + */ + private ByteBuffer compressPageSnappy(ByteBuffer compactPage, int compactSize) { + ByteBuffer compressedPage = compressBuf.get(); + + copyPageHeader(compactPage, compressedPage, compactSize); + + try { + int compressedSize = Snappy.compress(compactPage, compressedPage); + assert compressedPage.limit() == PageIO.COMMON_HEADER_END + compressedSize; + } + catch (IOException e) { + throw new IgniteException("Failed to compress page with Snappy.", e); + } + + compactPage.position(0); + compressedPage.position(0); + + return compressedPage; + } + + /** + * @param compactPage Compacted page. + * @param compressedPage Compressed page. + * @param compactSize Compacted page size. + */ + private static void copyPageHeader(ByteBuffer compactPage, ByteBuffer compressedPage, int compactSize) { + compactPage.limit(PageIO.COMMON_HEADER_END); + compressedPage.put(compactPage); + compactPage.limit(compactSize); + } + + /** + * @param compression Compression. + * @return Level. + */ + private static byte getCompressionType(DiskPageCompression compression) { + if (compression == null) + return UNCOMPRESSED_PAGE; + + switch (compression) { + case ZSTD: + return ZSTD_COMPRESSED_PAGE; + + case LZ4: + return LZ4_COMPRESSED_PAGE; + + case SNAPPY: + return SNAPPY_COMPRESSED_PAGE; + + case SKIP_GARBAGE: + return COMPACTED_PAGE; + } + throw new IllegalStateException("Unexpected compression: " + compression); + } + + /** {@inheritDoc} */ + @Override public void decompressPage(ByteBuffer page, int pageSize) throws IgniteCheckedException { + assert page.capacity() == pageSize; + + byte compressType = PageIO.getCompressionType(page); + + if (compressType == UNCOMPRESSED_PAGE) + return; // Nothing to do. + + short compressedSize = PageIO.getCompressedSize(page); + short compactSize = PageIO.getCompactedSize(page); + + assert compactSize <= pageSize && compactSize >= compressedSize; + + if (compressType == COMPACTED_PAGE) { + // Just setup bounds before restoring the page. + page.position(0).limit(compactSize); + } + else { + ByteBuffer dst = compressBuf.get(); + + // Position on a part that needs to be decompressed. + page.limit(compressedSize) + .position(PageIO.COMMON_HEADER_END); + + // LZ4 needs this limit to be exact. + dst.limit(compactSize - PageIO.COMMON_HEADER_END); + + switch (compressType) { + case ZSTD_COMPRESSED_PAGE: + Zstd.decompress(dst, page); + dst.flip(); + + break; + + case LZ4_COMPRESSED_PAGE: + Lz4.decompress(page, dst); + dst.flip(); + + break; + + case SNAPPY_COMPRESSED_PAGE: + try { + Snappy.uncompress(page, dst); + } + catch (IOException e) { + throw new IgniteException(e); + } + break; + + default: + throw new IgniteException("Unknown compression: " + compressType); + } + + page.position(PageIO.COMMON_HEADER_END).limit(compactSize); + page.put(dst).flip(); + assert page.limit() == compactSize; + } + + CompactablePageIO io = PageIO.getPageIO(page); + + io.restorePage(page, pageSize); + + setCompressionInfo(page, null, 0, 0); + } + + /** */ + static class Lz4 { + /** */ + static final LZ4Factory factory = LZ4Factory.fastestInstance(); + + /** */ + static final LZ4FastDecompressor decompressor = factory.fastDecompressor(); + + /** */ + static final LZ4Compressor fastCompressor = factory.fastCompressor(); + + /** + * @param level Compression level. + * @return Compressor. + */ + static LZ4Compressor getCompressor(int level) { + assert level >= 0 && level <= 17: level; + return level == 0 ? fastCompressor : factory.highCompressor(level); + } + + /** + * @param page Page. + * @param dst Destination buffer. + */ + static void decompress(ByteBuffer page, ByteBuffer dst) { + decompressor.decompress(page, dst); + } + } + + /** + */ + static final class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer> { + /** */ + final int size; + + /** + * @param size Size. + */ + ThreadLocalByteBuffer(int size) { + this.size = size; + } + + /** {@inheritDoc} */ + @Override protected ByteBuffer initialValue() { + return allocateDirectBuffer(size); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer get() { + ByteBuffer buf = super.get(); + buf.clear(); + return buf; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java new file mode 100644 index 0000000..22d4926 --- /dev/null +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemLinux.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import jnr.ffi.LibraryLoader; +import org.apache.ignite.IgniteException; + +/** + * Linux native file system API. + */ +public final class NativeFileSystemLinux extends NativeFileSystemPosix { + /** + * default is extend size + */ + public static final int FALLOC_FL_KEEP_SIZE = 0x01; + + /** + * de-allocates range + */ + public static final int FALLOC_FL_PUNCH_HOLE = 0x02; + + /** + * reserved codepoint + */ + public static final int FALLOC_FL_NO_HIDE_STALE = 0x04; + + /** + * FALLOC_FL_COLLAPSE_RANGE is used to remove a range of a file + * without leaving a hole in the file. The contents of the file beyond + * the range being removed is appended to the start offset of the range + * being removed (i.e. the hole that was punched is "collapsed"), + * resulting in a file layout that looks like the range that was + * removed never existed. As such collapsing a range of a file changes + * the size of the file, reducing it by the same length of the range + * that has been removed by the operation. + * + * Different filesystems may implement different limitations on the + * granularity of the operation. Most will limit operations to + * filesystem block size boundaries, but this boundary may be larger or + * smaller depending on the filesystem and/or the configuration of the + * filesystem or file. + * + * Attempting to collapse a range that crosses the end of the file is + * considered an illegal operation - just use ftruncate(2) if you need + * to collapse a range that crosses EOF. + */ + public static final int FALLOC_FL_COLLAPSE_RANGE = 0x08; + + /** + * FALLOC_FL_ZERO_RANGE is used to convert a range of file to zeros preferably + * without issuing data IO. Blocks should be preallocated for the regions that + * span holes in the file, and the entire range is preferable converted to + * unwritten extents - even though file system may choose to zero out the + * extent or do whatever which will result in reading zeros from the range + * while the range remains allocated for the file. + * + * This can be also used to preallocate blocks past EOF in the same way as + * with fallocate. Flag FALLOC_FL_KEEP_SIZE should cause the inode + * size to remain the same. + */ + public static final int FALLOC_FL_ZERO_RANGE = 0x10; + + /** + * FALLOC_FL_INSERT_RANGE is use to insert space within the file size without + * overwriting any existing data. The contents of the file beyond offset are + * shifted towards right by len bytes to create a hole. As such, this + * operation will increase the size of the file by len bytes. + * + * Different filesystems may implement different limitations on the granularity + * of the operation. Most will limit operations to filesystem block size + * boundaries, but this boundary may be larger or smaller depending on + * the filesystem and/or the configuration of the filesystem or file. + * + * Attempting to insert space using this flag at OR beyond the end of + * the file is considered an illegal operation - just use ftruncate(2) or + * fallocate(2) with mode 0 for such type of operations. + */ + public static final int FALLOC_FL_INSERT_RANGE = 0x20; + + /** + * FALLOC_FL_UNSHARE_RANGE is used to unshare shared blocks within the + * file size without overwriting any existing data. The purpose of this + * call is to preemptively reallocate any blocks that are subject to + * copy-on-write. + * + * Different filesystems may implement different limitations on the + * granularity of the operation. Most will limit operations to filesystem + * block size boundaries, but this boundary may be larger or smaller + * depending on the filesystem and/or the configuration of the filesystem + * or file. + * + * This flag can only be used with allocate-mode fallocate, which is + * to say that it cannot be used with the punch, zero, collapse, or + * insert range modes. + */ + public static final int FALLOC_FL_UNSHARE_RANGE = 0x40; + + /** */ + private static final LinuxNativeLibC libc = LibraryLoader.create(LinuxNativeLibC.class) + .failImmediately().load("c"); + + /** {@inheritDoc} */ + @Override public void punchHole(int fd, long off, long len) { + int res = libc.fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, off, len); + + if (res != 0) + throw new IgniteException("errno: " + res); + } + + /** + */ + public interface LinuxNativeLibC { + /** + * Allows the caller to directly manipulate the allocated + * disk space for the file referred to by fd for the byte range starting + * at {@code off} offset and continuing for {@code len} bytes. + * + * @param fd file descriptor. + * @param mode determines the operation to be performed on the given range. + * @param off required position offset. + * @param len required length. + * @return On success, fallocate() returns zero. On error, -1 is returned and + * {@code errno} is set to indicate the error. + */ + int fallocate(int fd, int mode, long off, long len); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java new file mode 100644 index 0000000..fcf485f --- /dev/null +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/NativeFileSystemPosix.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.nio.file.Path; +import jnr.posix.FileStat; +import jnr.posix.POSIX; +import jnr.posix.POSIXFactory; + +/** + * Posix file system API. + */ +public class NativeFileSystemPosix implements NativeFileSystem { + /** */ + private static POSIX posix = POSIXFactory.getPOSIX(); + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize(Path path) { + FileStat stat = posix.stat(path.toString()); + return Math.toIntExact(stat.blockSize()); + } + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize(int fd) { + FileStat stat = posix.fstat(fd); + return Math.toIntExact(stat.blockSize()); + } + + /** {@inheritDoc} */ + @Override public long getSparseFileSize(int fd) { + FileStat stat = posix.fstat(fd); + return stat.blocks() * 512; + } + + /** {@inheritDoc} */ + @Override public void punchHole(int fd, long off, long len) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java new file mode 100644 index 0000000..f660426 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.SimpleDataPageIO; +import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.testframework.junits.GridTestKernalContext; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.configuration.DiskPageCompression.LZ4; +import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; +import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY; +import static org.apache.ignite.configuration.DiskPageCompression.ZSTD; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.UNCOMPRESSED_PAGE; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer; +import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestInnerIO.INNER_IO; +import static org.apache.ignite.internal.processors.compress.CompressionProcessorTest.TestLeafIO.LEAF_IO; +import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress; + +/** + */ +public class CompressionProcessorTest extends GridCommonAbstractTest { + /** */ + private static final int ITEM_SIZE = 6; // To fill the whole page. + + /** */ + private int blockSize = 16; + + /** */ + private int pageSize = 4 * 1024; + + /** */ + private DiskPageCompression compression; + + /** */ + private int compressLevel; + + /** */ + private CompressionProcessor p; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + PageIO.registerTest(INNER_IO, LEAF_IO); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() { + p = new CompressionProcessorImpl(new GridTestKernalContext(log)); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageCompact16() throws IgniteCheckedException { + blockSize = 16; + compression = SKIP_GARBAGE; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageCompact128() throws IgniteCheckedException { + blockSize = 128; + compression = SKIP_GARBAGE; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageCompact1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SKIP_GARBAGE; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageCompact2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SKIP_GARBAGE; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageZstd16() throws IgniteCheckedException { + blockSize = 16; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageZstd128() throws IgniteCheckedException { + blockSize = 128; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageZstd1k() throws IgniteCheckedException { + blockSize = 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageZstd2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageSnappy16() throws IgniteCheckedException { + blockSize = 16; + compression = SNAPPY; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageSnappy128() throws IgniteCheckedException { + blockSize = 128; + compression = SNAPPY; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageSnappy1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SNAPPY; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageSnappy2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SNAPPY; + + doTestDataPage(); + } + + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Fast16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Fast128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Fast1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Fast2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Slow16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Slow128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Slow1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testDataPageLz4Slow2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestDataPage(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageCompact16() throws IgniteCheckedException { + blockSize = 16; + compression = SKIP_GARBAGE; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageCompact16() throws IgniteCheckedException { + blockSize = 16; + compression = SKIP_GARBAGE; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageZstd16() throws IgniteCheckedException { + blockSize = 16; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageZstd16() throws IgniteCheckedException { + blockSize = 16; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Fast16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Fast16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Slow16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Slow16() throws IgniteCheckedException { + blockSize = 16; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageSnappy16() throws IgniteCheckedException { + blockSize = 16; + compression = SNAPPY; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageSnappy16() throws IgniteCheckedException { + blockSize = 16; + compression = SNAPPY; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageCompact128() throws IgniteCheckedException { + blockSize = 128; + compression = SKIP_GARBAGE; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageCompact128() throws IgniteCheckedException { + blockSize = 128; + compression = SKIP_GARBAGE; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageZstd128() throws IgniteCheckedException { + blockSize = 128; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageZstd128() throws IgniteCheckedException { + blockSize = 128; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Fast128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Fast128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Slow128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Slow128() throws IgniteCheckedException { + blockSize = 128; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageSnappy128() throws IgniteCheckedException { + blockSize = 128; + compression = SNAPPY; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageSnappy128() throws IgniteCheckedException { + blockSize = 128; + compression = SNAPPY; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageCompact1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SKIP_GARBAGE; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageCompact1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SKIP_GARBAGE; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageZstd1k() throws IgniteCheckedException { + blockSize = 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageZstd1k() throws IgniteCheckedException { + blockSize = 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Fast1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Fast1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Slow1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Slow1k() throws IgniteCheckedException { + blockSize = 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageSnappy1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SNAPPY; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageSnappy1k() throws IgniteCheckedException { + blockSize = 1024; + compression = SNAPPY; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageCompact2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SKIP_GARBAGE; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageCompact2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SKIP_GARBAGE; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageZstd2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageZstd2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = ZSTD; + compressLevel = ZSTD_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Fast2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Fast2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MIN_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageLz4Slow2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageLz4Slow2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = LZ4; + compressLevel = LZ4_MAX_LEVEL; + + doTestBTreePage(LEAF_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInnerPageSnappy2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SNAPPY; + + doTestBTreePage(INNER_IO); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testLeafPageSnappy2k() throws IgniteCheckedException { + blockSize = 2 * 1024; + compression = SNAPPY; + + doTestBTreePage(LEAF_IO); + } + + /** + * @param io Page IO. + * @throws IgniteCheckedException If failed. + */ + private void doTestBTreePage(BPlusIO<byte[]> io) throws IgniteCheckedException { + Random rnd = ThreadLocalRandom.current(); + + final byte[][] rows = new byte[3][io.getItemSize()]; + + for (int i = 0; i < rows.length; i++) + rnd.nextBytes(rows[i]); + + ByteBuffer page = allocateDirectBuffer(pageSize); + long pageAddr = bufferAddress(page); + + long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 171717); + + io.initNewPage(pageAddr, pageId, pageSize); + + checkIo(io, page); + + Function<ByteBuffer, List<?>> getContents = (buf) -> { + long addr = bufferAddress(buf); + + int cnt = io.getCount(addr); + + List<Object> list = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + if (!io.isLeaf()) + list.add(((BPlusInnerIO)io).getLeft(addr, i)); + + try { + list.add(new Bytes(io.getLookupRow(null, addr, i))); + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + + if (!io.isLeaf()) + list.add(((BPlusInnerIO)io).getRight(addr, i)); + } + + return list; + }; + + // Empty page. + checkCompressDecompress(page, getContents, false); + + int cnt = io.getMaxCount(pageAddr, pageSize); + + for (int i = 0; i < cnt; i++) { + byte[] row = rows[rnd.nextInt(rows.length)]; + io.insert(pageAddr, i, row, row, 777_000 + i, false); + } + + if (io.isLeaf()) + assertEquals(pageSize, io.getItemsEnd(pageAddr)); // Page must be full. + + // Full page. + checkCompressDecompress(page, getContents, io.isLeaf()); + + io.setCount(pageAddr, cnt / 2); + + // Half page. + checkCompressDecompress(page, getContents, false); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void doTestDataPage() throws IgniteCheckedException { + Random rnd = ThreadLocalRandom.current(); + + final byte[][] rows = new byte[][]{ + new byte[17], new byte[37], new byte[71] + }; + + for (int i = 0; i < rows.length; i++) + rnd.nextBytes(rows[i]); + + ByteBuffer page = allocateDirectBuffer(pageSize); + long pageAddr = bufferAddress(page); + + SimpleDataPageIO io = SimpleDataPageIO.VERSIONS.latest(); + + long pageId = PageIdUtils.pageId(PageIdAllocator.MAX_PARTITION_ID, PageIdAllocator.FLAG_DATA, 171717); + + io.initNewPage(pageAddr, pageId, pageSize); + + checkIo(io, page); + + Function<ByteBuffer,List<Bytes>> getContents = (buf) -> { + try { + long addr = bufferAddress(buf); + + return io.forAllItems(addr, (link) -> { + DataPagePayload payload = io.readPayload(addr, PageIdUtils.itemId(link), pageSize); + + return new Bytes(payload.getBytes(addr)); + }); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }; + + // Empty data page. + checkCompressDecompress(page, getContents, false); + + GridIntList itemIds = new GridIntList(); + + for (;;) { + byte[] row = rows[rnd.nextInt(rows.length)]; + + if (io.getFreeSpace(pageAddr) < row.length) + break; + + itemIds.add(io.addRow(pageAddr, row, pageSize)); + } + + int freeSpace = io.getFreeSpace(pageAddr); + + if (freeSpace != 0) { + byte[] lastRow = new byte[freeSpace]; + rnd.nextBytes(lastRow); + + io.addRowFragment(pageId, pageAddr, lastRow, 777L, pageSize); + + assertEquals(0, io.getRealFreeSpace(pageAddr)); + } + + // Full data page. + checkCompressDecompress(page, getContents, io.getRealFreeSpace(pageAddr) == 0); + + for (int i = 0; i < itemIds.size(); i += 2) + io.removeRow(pageAddr, itemIds.get(i), pageSize); + + // Half-filled data page. + checkCompressDecompress(page, getContents, false); + } + + private void checkIo(PageIO io, ByteBuffer page) throws IgniteCheckedException { + assertSame(io, PageIO.getPageIO(bufferAddress(page))); + assertSame(io, PageIO.getPageIO(page)); + } + + private void checkCompressDecompress(ByteBuffer page, Function<ByteBuffer, ?> getPageContents, boolean fullPage) + throws IgniteCheckedException { + PageIO.setCrc(page, 0xABCDEF13); + + long pageId = PageIO.getPageId(page); + PageIO io = PageIO.getPageIO(page); + + ByteBuffer compressed = p.compressPage(page, pageSize, blockSize, compression, compressLevel); + + int compressedSize = PageIO.getCompressedSize(compressed); + + assertNotSame(page, compressed); // This is generally possible but not interesting in this test. + + assertTrue(compressedSize > 0); + assertTrue(compressedSize <= pageSize); + assertEquals(compressedSize, compressed.limit()); + + if (!fullPage || compression != SKIP_GARBAGE) + assertTrue(pageSize > compressedSize); + + assertEquals(0, compressed.position()); + + checkIo(io, compressed); + assertEquals(0, page.position()); + assertEquals(pageSize, page.limit()); + + info(io.getClass().getSimpleName() + " " + compression + " " + compressLevel + ": " + compressedSize + "/" + pageSize); + + if (!fullPage || compression != SKIP_GARBAGE) + assertTrue(compressedSize < pageSize); + + assertEquals(pageId, PageIO.getPageId(compressed)); + + ByteBuffer decompress = allocateDirectBuffer(pageSize); + decompress.put(compressed).clear(); + + p.decompressPage(decompress, pageSize); + + assertEquals(0, decompress.position()); + assertEquals(pageSize, decompress.limit()); + + checkIo(io, decompress); + assertEquals(UNCOMPRESSED_PAGE, PageIO.getCompressionType(page)); + assertEquals(0, PageIO.getCompressedSize(page)); + assertEquals(0, PageIO.getCompactedSize(page)); + + assertTrue(Arrays.equals(getPageCommonHeader(page), getPageCommonHeader(decompress))); + assertEquals(getPageContents.apply(page), getPageContents.apply(decompress)); + } + + /** + * @param page Page. + * @return Page header. + */ + private static byte[] getPageCommonHeader(ByteBuffer page) { + return PageUtils.getBytes(GridUnsafe.bufferAddress(page), 0, PageIO.COMMON_HEADER_END); + } + + /** + */ + private static class Bytes { + /** */ + private final byte[] bytes; + + /** + * @param bytes Bytes. + */ + private Bytes(byte[] bytes) { + assert bytes != null; + this.bytes = bytes; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Bytes bytes1 = (Bytes)o; + + return Arrays.equals(bytes, bytes1.bytes); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Arrays.hashCode(bytes); + } + } + + /** + */ + static class TestLeafIO extends BPlusLeafIO<byte[]> { + /** */ + static final TestLeafIO LEAF_IO = new TestLeafIO(); + + /** + */ + TestLeafIO() { + super(29_501, 1, ITEM_SIZE); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, byte[] row) { + PageUtils.putBytes(pageAddr, off, row); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr, + int srcIdx) throws IgniteCheckedException { + storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx)); + } + + /** {@inheritDoc} */ + @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) { + return PageUtils.getBytes(pageAddr, offset(idx), itemSize); + } + } + + /** + */ + static class TestInnerIO extends BPlusInnerIO<byte[]> { + /** */ + static TestInnerIO INNER_IO = new TestInnerIO(); + + /** + */ + TestInnerIO() { + super(29_502, 1, true, ITEM_SIZE); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, byte[] row) { + PageUtils.putBytes(pageAddr, off, row); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<byte[]> srcIo, long srcPageAddr, + int srcIdx) throws IgniteCheckedException { + storeByOffset(dstPageAddr, offset(dstIdx), srcIo.getLookupRow(null, srcPageAddr, srcIdx)); + } + + /** {@inheritDoc} */ + @Override public byte[] getLookupRow(BPlusTree<byte[],?> tree, long pageAddr, int idx) { + return PageUtils.getBytes(pageAddr, offset(idx), itemSize); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java new file mode 100644 index 0000000..658a5d2 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationAsyncTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; + +/** + */ +public class DiskPageCompressionIntegrationAsyncTest extends DiskPageCompressionIntegrationTest { + /** {@inheritDoc} */ + @Override protected FileIOFactory getFileIOFactory() { + return new AsyncFileIOFactory(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java new file mode 100644 index 0000000..ca7f4ea --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.file.OpenOption; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; +import static org.apache.ignite.configuration.DiskPageCompression.LZ4; +import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; +import static org.apache.ignite.configuration.DiskPageCompression.SNAPPY; +import static org.apache.ignite.configuration.DiskPageCompression.ZSTD; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_DEFAULT_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.LZ4_MIN_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MAX_LEVEL; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.ZSTD_MIN_LEVEL; + +/** + * + */ +public class DiskPageCompressionIntegrationTest extends GridCommonAbstractTest { + /** */ + private DiskPageCompression compression; + + /** */ + private Integer compressionLevel; + + /** */ + private FileIOFactory factory; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + compression = null; + compressionLevel = null; + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception { + DataRegionConfiguration drCfg = new DataRegionConfiguration() + .setPersistenceEnabled(true); + + factory = getFileIOFactory(); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setMetricsEnabled(true) + .setPageSize(MAX_PAGE_SIZE) + .setDefaultDataRegionConfiguration(drCfg) + .setFileIOFactory(U.isLinux() ? factory : new PunchFileIOFactory(factory)); + + return super.getConfiguration(igniteName).setDataStorageConfiguration(dsCfg); + } + + /** + * @return File IO factory. + */ + protected FileIOFactory getFileIOFactory() { + return new RandomAccessFileIOFactory(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Zstd_Max() throws Exception { + compression = ZSTD; + compressionLevel = ZSTD_MAX_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Zstd_Default() throws Exception { + compression = ZSTD; + compressionLevel = null; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Zstd_Min() throws Exception { + compression = ZSTD; + compressionLevel = ZSTD_MIN_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Lz4_Max() throws Exception { + compression = LZ4; + compressionLevel = LZ4_MAX_LEVEL; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Lz4_Default() throws Exception { + compression = LZ4; + compressionLevel = null; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Lz4_Min() throws Exception { + assertEquals(LZ4_MIN_LEVEL, LZ4_DEFAULT_LEVEL); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_SkipGarbage() throws Exception { + compression = SKIP_GARBAGE; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + public void testPageCompression_Snappy() throws Exception { + compression = SNAPPY; + + doTestPageCompression(); + } + + /** + * @throws Exception If failed. + */ + private void doTestPageCompression() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + String cacheName = "test"; + + CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>() + .setName(cacheName) + .setBackups(0) + .setAtomicityMode(ATOMIC) + .setIndexedTypes(Integer.class, TestVal.class) + .setDiskPageCompression(compression) + .setDiskPageCompressionLevel(compressionLevel); + + IgniteCache<Integer,TestVal> cache = ignite.getOrCreateCache(ccfg); + + int cnt = 2_000; + + for (int i = 0; i < cnt; i++) + assertTrue(cache.putIfAbsent(i, new TestVal(i))); + + for (int i = 0; i < cnt; i += 2) + assertEquals(new TestVal(i), cache.getAndRemove(i)); + + GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context() + .cache().context().database()); + + dbMgr.forceCheckpoint("test compression").finishFuture().get(); + + FilePageStoreManager storeMgr = dbMgr.getFileStoreManager(); + + checkFileIOFactory(storeMgr.getPageStoreFileIoFactory()); + + Thread.sleep(100); // Wait for metrics update. + + long storeSize = ignite.dataStorageMetrics().getStorageSize(); + long sparseStoreSize = ignite.dataStorageMetrics().getSparseStorageSize(); + + assertTrue("storeSize: " + storeSize, storeSize > 0); + + if (U.isLinux()) { + assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0); + assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize); + } + else + assertTrue(sparseStoreSize < 0); + + GridCacheContext<?,?> cctx = ignite.cachex(cacheName).context(); + + int cacheId = cctx.cacheId(); + int groupId = cctx.groupId(); + + assertEquals(cacheId, groupId); + + CacheGroupMetricsMXBean mx = cctx.group().mxBean(); + + storeSize = mx.getStorageSize(); + sparseStoreSize = mx.getSparseStorageSize(); + + assertTrue("storeSize: " + storeSize, storeSize > 0); + + if (U.isLinux()) { + assertTrue("sparseSize: " + sparseStoreSize, sparseStoreSize > 0); + assertTrue(storeSize + " > " + sparseStoreSize, storeSize > sparseStoreSize); + } + else + assertTrue(sparseStoreSize < 0); + + int parts = cctx.affinity().partitions(); + + for (int i = 0; i < parts; i++) { + PageStore store = storeMgr.getStore(cacheId, i); + + long realSize = store.size(); + long virtualSize = store.getPageSize() * store.pages(); + long sparseSize = store.getSparseSize(); + + assertTrue(virtualSize > 0); + + error("virt: " + virtualSize + ", real: " + realSize + ", sparse: " + sparseSize); + + if (!store.exists()) + continue; + + if (virtualSize > sparseSize) + return; + } + + fail("No files were compacted."); + } + + /** + */ + public void _testCompressionRatio() throws Exception { + IgniteEx ignite = startGrid(0); + + ignite.cluster().active(true); + + String cacheName = "test"; + + CacheConfiguration<Integer,TestVal> ccfg = new CacheConfiguration<Integer,TestVal>() + .setName(cacheName) + .setBackups(0) + .setAtomicityMode(ATOMIC) + .setIndexedTypes(Integer.class, TestVal.class) + .setAffinity(new RendezvousAffinityFunction().setPartitions(10)) + .setDiskPageCompression(ZSTD); +// .setDiskPageCompressionLevel(compressionLevel); + + ignite.getOrCreateCache(ccfg); + + IgniteInternalCache<Integer,TestVal> cache = ignite.cachex(cacheName); + + CacheGroupMetricsMXBean mx = cache.context().group().mxBean(); + + GridCacheDatabaseSharedManager dbMgr = ((GridCacheDatabaseSharedManager)ignite.context() + .cache().context().database()); + + int cnt = 20_000_000; + + for (int i = 0; i < cnt; i++) { + assertTrue(cache.putIfAbsent(i, new TestVal(i))); + + if (i % 50_000 == 0) { + dbMgr.forceCheckpoint("test").finishFuture().get(); + + long sparse = mx.getSparseStorageSize(); + long size = mx.getStorageSize(); + + System.out.println(i + " >> " + sparse + " / " + size + " = " + ((double)sparse / size)); + } + } + } + + /** + * @param f Factory. + */ + protected void checkFileIOFactory(FileIOFactory f) { + if (!U.isLinux()) + f = ((PunchFileIOFactory)f).delegate; + + assertSame(factory, f); + } + + /** + */ + static class TestVal implements Serializable { + /** */ + static final long serialVersionUID = 1L; + + /** */ + @QuerySqlField + String str; + + /** */ + int i; + + /** */ + @QuerySqlField + long x; + + /** */ + @QuerySqlField + UUID id; + + TestVal(int i) { + this.str = i + "bla bla bla!"; + this.i = -i; + this.x = 0xffaabbccdd773311L + i; + this.id = new UUID(i,-i); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TestVal testVal = (TestVal)o; + + if (i != testVal.i) return false; + if (x != testVal.x) return false; + if (str != null ? !str.equals(testVal.str) : testVal.str != null) return false; + return id != null ? id.equals(testVal.id) : testVal.id == null; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = str != null ? str.hashCode() : 0; + result = 31 * result + i; + result = 31 * result + (int)(x ^ (x >>> 32)); + result = 31 * result + (id != null ? id.hashCode() : 0); + return result; + } + } + + /** + */ + static class PunchFileIO extends FileIODecorator { + /** */ + private ConcurrentMap<Long, Integer> holes = new ConcurrentHashMap<>(); + + /** + * @param delegate File I/O delegate + */ + public PunchFileIO(FileIO delegate) { + super(Objects.requireNonNull(delegate)); + } + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + assertFalse(U.isLinux()); + + return 4 * 1024; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + assertFalse(U.isLinux()); + + long holesSize = holes.values().stream().mapToLong(x -> x).sum(); + + try { + return size() - holesSize; + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int writeFully(ByteBuffer srcBuf, long position) throws IOException { + assertFalse(U.isLinux()); + + holes.remove(position); + return super.writeFully(srcBuf, position); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long pos, int len) { + assertFalse(U.isLinux()); + + assertTrue(len > 0); + + int blockSize = getFileSystemBlockSize(); + + len = len / blockSize * blockSize; + + if (len > 0) + holes.put(pos, len); + + return len; + } + } + + /** + */ + static class PunchFileIOFactory implements FileIOFactory { + /** */ + final FileIOFactory delegate; + + /** + * @param delegate Delegate. + */ + PunchFileIOFactory(FileIOFactory delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return new PunchFileIO(delegate.create(file)); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + return new PunchFileIO(delegate.create(file, modes)); + } + } +}
