Faster sequential IO (CASSANDRA-8630) Merge RandomAccessReader and NIODataInputStream class hierarchies to share performance optimisation work across all readers.
patch by stefania; reviewed by ariel and benedict for CASSANDRA-8630 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce63ccc8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce63ccc8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce63ccc8 Branch: refs/heads/cassandra-3.0 Commit: ce63ccc842dc6e7129765391c611402eb02a3a23 Parents: 7c0bfe9 Author: Stefania Alborghetti <[email protected]> Authored: Mon Jul 27 16:34:46 2015 +0800 Committer: Benedict Elliott Smith <[email protected]> Committed: Fri Sep 4 12:42:35 2015 +0100 ---------------------------------------------------------------------- build.xml | 8 +- lib/licenses/ohc-0.3.4.txt | 201 -------- lib/licenses/ohc-0.4.2.txt | 201 ++++++++ lib/ohc-core-0.4.2.jar | Bin 0 -> 126802 bytes lib/ohc-core-0.4.jar | Bin 127890 -> 0 bytes lib/ohc-core-j8-0.4.2.jar | Bin 0 -> 4994 bytes lib/ohc-core-j8-0.4.jar | Bin 4989 -> 0 bytes .../apache/cassandra/cache/AutoSavingCache.java | 2 +- .../org/apache/cassandra/cache/OHCProvider.java | 3 +- .../org/apache/cassandra/db/SystemKeyspace.java | 2 +- .../db/commitlog/CommitLogReplayer.java | 19 +- .../cassandra/hints/ChecksummedDataInput.java | 128 +++-- .../org/apache/cassandra/hints/HintsReader.java | 67 ++- .../compress/CompressedRandomAccessReader.java | 147 +++--- .../io/compress/CompressedThrottledReader.java | 48 -- .../io/compress/CompressionMetadata.java | 2 +- .../cassandra/io/sstable/KeyIterator.java | 1 - .../io/sstable/format/SSTableReader.java | 100 ++-- .../cassandra/io/sstable/format/Version.java | 2 + .../io/sstable/format/big/BigFormat.java | 9 + .../io/sstable/format/big/BigTableReader.java | 108 ++--- .../io/sstable/format/big/BigTableWriter.java | 2 - .../cassandra/io/util/AbstractDataInput.java | 343 ------------- .../io/util/BufferedSegmentedFile.java | 24 - .../cassandra/io/util/ByteBufferDataInput.java | 171 ------- .../apache/cassandra/io/util/ChannelProxy.java | 4 +- .../io/util/ChecksummedRandomAccessReader.java | 62 ++- .../io/util/CompressedSegmentedFile.java | 100 ++-- .../cassandra/io/util/DataInputBuffer.java | 24 +- .../io/util/DataIntegrityMetadata.java | 11 +- .../apache/cassandra/io/util/FileDataInput.java | 25 +- .../io/util/FileSegmentInputStream.java | 96 ++++ .../cassandra/io/util/ICompressedFile.java | 9 +- .../cassandra/io/util/MemoryInputStream.java | 54 ++- .../cassandra/io/util/MmappedRegions.java | 344 +++++++++++++ .../cassandra/io/util/MmappedSegmentedFile.java | 214 +++----- .../cassandra/io/util/NIODataInputStream.java | 352 +------------- .../cassandra/io/util/RandomAccessReader.java | 417 ++++++++++------ .../io/util/RebufferingInputStream.java | 286 +++++++++++ .../apache/cassandra/io/util/SegmentedFile.java | 115 ++--- .../cassandra/io/util/ThrottledReader.java | 48 -- .../compress/CompressedStreamWriter.java | 10 +- .../apache/cassandra/utils/ByteBufferUtil.java | 3 - .../org/apache/cassandra/utils/Throwables.java | 45 +- .../apache/cassandra/utils/vint/VIntCoding.java | 2 +- .../cassandra/db/commitlog/CommitLogTest.java | 8 +- .../db/commitlog/CommitLogTestReplayer.java | 3 +- .../hints/ChecksummedDataInputTest.java | 227 +++++++-- .../io/ChecksummedRandomAccessReaderTest.java | 127 ----- .../cassandra/io/RandomAccessReaderTest.java | 269 ----------- .../CompressedRandomAccessReaderTest.java | 245 +++++----- .../CompressedSequentialWriterTest.java | 2 +- .../cassandra/io/sstable/SSTableReaderTest.java | 3 +- .../io/util/BufferedDataOutputStreamTest.java | 12 +- .../io/util/BufferedRandomAccessFileTest.java | 91 +--- .../util/ChecksummedRandomAccessReaderTest.java | 127 +++++ .../io/util/FileSegmentInputStreamTest.java | 131 +++++ .../apache/cassandra/io/util/MemoryTest.java | 37 ++ .../cassandra/io/util/MmappedRegionsTest.java | 375 ++++++++++++++ .../io/util/NIODataInputStreamTest.java | 16 +- .../io/util/RandomAccessReaderTest.java | 483 +++++++++++++++++++ 61 files changed, 3339 insertions(+), 2626 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 28f08d6..252b1a8 100644 --- a/build.xml +++ b/build.xml @@ -415,8 +415,8 @@ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" /> --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" version="4.4.2" /> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" /> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" /> <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" /> <dependency groupId="org.fusesource" artifactId="sigar" version="1.6.4"> <exclusion groupId="log4j" artifactId="log4j"/> @@ -470,8 +470,8 @@ <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> --> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" /> - <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" /> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/> <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.3.4.txt ---------------------------------------------------------------------- diff --git a/lib/licenses/ohc-0.3.4.txt b/lib/licenses/ohc-0.3.4.txt deleted file mode 100644 index eb6b5d3..0000000 --- a/lib/licenses/ohc-0.3.4.txt +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.4.2.txt ---------------------------------------------------------------------- diff --git a/lib/licenses/ohc-0.4.2.txt b/lib/licenses/ohc-0.4.2.txt new file mode 100644 index 0000000..eb6b5d3 --- /dev/null +++ b/lib/licenses/ohc-0.4.2.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.2.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-0.4.2.jar b/lib/ohc-core-0.4.2.jar new file mode 100644 index 0000000..019adcd Binary files /dev/null and b/lib/ohc-core-0.4.2.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-0.4.jar b/lib/ohc-core-0.4.jar deleted file mode 100644 index 1b1b939..0000000 Binary files a/lib/ohc-core-0.4.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.2.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-j8-0.4.2.jar b/lib/ohc-core-j8-0.4.2.jar new file mode 100644 index 0000000..583f4aa Binary files /dev/null and b/lib/ohc-core-j8-0.4.2.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-j8-0.4.jar b/lib/ohc-core-j8-0.4.jar deleted file mode 100644 index f97ddf5..0000000 Binary files a/lib/ohc-core-j8-0.4.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 8650d47..ebd2830 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -68,7 +68,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { public InputStream getInputStream(File dataPath, File crcPath) throws IOException { - return ChecksummedRandomAccessReader.open(dataPath, crcPath); + return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build(); } public OutputStream getOutputStream(File dataPath, File crcPath) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java index b0b4521..c6c6bb7 100644 --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.NIODataInputStream; +import org.apache.cassandra.io.util.RebufferingInputStream; import org.caffinitas.ohc.OHCache; import org.caffinitas.ohc.OHCacheBuilder; @@ -172,7 +173,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { try { - NIODataInputStream in = new DataInputBuffer(buf, false); + RebufferingInputStream in = new DataInputBuffer(buf, false); boolean isSentinel = in.readBoolean(); if (isSentinel) return new RowCacheSentinel(in.readLong()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index cf8e14d..d54ee8b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -676,7 +676,7 @@ public final class SystemKeyspace { try { - NIODataInputStream in = new DataInputBuffer(bytes, true); + RebufferingInputStream in = new DataInputBuffer(bytes, true); return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 4f50008..7049191 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -47,12 +47,13 @@ import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.FileSegmentInputStream; +import org.apache.cassandra.io.util.RebufferingInputStream; import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.compress.ICompressor; -import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.FBUtilities; @@ -290,8 +291,8 @@ public class CommitLogReplayer public void recover(File file, boolean tolerateTruncation) throws IOException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); - RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); - try + try(ChannelProxy channel = new ChannelProxy(file); + RandomAccessReader reader = RandomAccessReader.open(channel)) { if (desc.version < CommitLogDescriptor.VERSION_21) { @@ -299,7 +300,7 @@ public class CommitLogReplayer return; if (globalPosition.segment == desc.id) reader.seek(globalPosition.position); - replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation); + replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation); return; } @@ -388,7 +389,7 @@ public class CommitLogReplayer if (uncompressedLength > uncompressedBuffer.length) uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); - sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0); + sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos); errorContext = "compressed section at " + start + " in " + errorContext; } catch (IOException | ArrayIndexOutOfBoundsException e) @@ -403,10 +404,6 @@ public class CommitLogReplayer if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) break; } - } - finally - { - FileUtils.closeQuietly(reader); logger.info("Finished reading {}", file); } } @@ -522,7 +519,7 @@ public class CommitLogReplayer { final Mutation mutation; - try (NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) + try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) { mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index fa727bc..543f14e 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -17,98 +17,148 @@ */ package org.apache.cassandra.hints; +import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.zip.CRC32; -import org.apache.cassandra.io.util.AbstractDataInput; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.RandomAccessReader; /** - * An {@link AbstractDataInput} wrapper that calctulates the CRC in place. + * A {@link RandomAccessReader} wrapper that calctulates the CRC in place. * * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want - * to allocate an extra byte array just that purpose. + * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc(). * - * In addition to calculating the CRC, allows to enforce a maximim known size. This is needed + * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed * so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a * corrupted sequence by reading a huge corrupted length of bytes via * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}. */ -public final class ChecksummedDataInput extends AbstractDataInput +public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel { private final CRC32 crc; - private final AbstractDataInput source; - private int limit; + private int crcPosition; + private boolean crcUpdateDisabled; - private ChecksummedDataInput(AbstractDataInput source) + private long limit; + private FileMark limitMark; + + private ChecksummedDataInput(Builder builder) { - this.source = source; + super(builder); crc = new CRC32(); - limit = Integer.MAX_VALUE; + crcPosition = 0; + crcUpdateDisabled = false; + + resetLimit(); } - public static ChecksummedDataInput wrap(AbstractDataInput source) + public static ChecksummedDataInput open(File file) { - return new ChecksummedDataInput(source); + return new Builder(new ChannelProxy(file)).build(); } public void resetCrc() { crc.reset(); + crcPosition = buffer.position(); } - public void resetLimit() + public void limit(long newLimit) { - limit = Integer.MAX_VALUE; + limit = newLimit; + limitMark = mark(); } - public void limit(int newLimit) + public void resetLimit() { - limit = newLimit; + limit = Long.MAX_VALUE; + limitMark = null; } - public int bytesRemaining() + public void checkLimit(int length) throws IOException { - return limit; + if (limitMark == null) + return; + + if ((bytesPastLimit() + length) > limit) + throw new IOException("Digest mismatch exception"); } - public int getCrc() + public long bytesPastLimit() { - return (int) crc.getValue(); + assert limitMark != null; + return bytesPastMark(limitMark); } - public void seek(long position) throws IOException + public boolean checkCrc() throws IOException { - source.seek(position); + try + { + updateCrc(); + + // we must diable crc updates in case we rebuffer + // when called source.readInt() + crcUpdateDisabled = true; + return ((int) crc.getValue()) == readInt(); + } + finally + { + crcPosition = buffer.position(); + crcUpdateDisabled = false; + } } - public long getPosition() + @Override + public void readFully(byte[] b) throws IOException { - return source.getPosition(); + checkLimit(b.length); + super.readFully(b); } - public long getPositionLimit() + @Override + public int read(byte[] b, int off, int len) throws IOException { - return source.getPositionLimit(); + checkLimit(len); + return super.read(b, off, len); } - public int read() throws IOException + @Override + public void reBuffer() { - int b = source.read(); - crc.update(b); - limit--; - return b; + updateCrc(); + super.reBuffer(); + crcPosition = buffer.position(); } - @Override - public int read(byte[] buff, int offset, int length) throws IOException + private void updateCrc() { - if (length > limit) - throw new IOException("Digest mismatch exception"); + if (crcPosition == buffer.position() | crcUpdateDisabled) + return; + + assert crcPosition >= 0 && crcPosition < buffer.position(); - int copied = source.read(buff, offset, length); - crc.update(buff, offset, copied); - limit -= copied; - return copied; + ByteBuffer unprocessed = buffer.duplicate(); + unprocessed.position(crcPosition) + .limit(buffer.position()); + + crc.update(unprocessed); + } + + public final static class Builder extends RandomAccessReader.Builder + { + public Builder(ChannelProxy channel) + { + super(channel); + } + + public ChecksummedDataInput build() + { + return new ChecksummedDataInput(this); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index 7d164b4..bc83654 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -24,6 +24,7 @@ import java.util.Iterator; import javax.annotation.Nullable; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; @@ -32,7 +33,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CLibrary; @@ -57,25 +57,23 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> private final HintsDescriptor descriptor; private final File file; - private final RandomAccessReader reader; - private final ChecksummedDataInput crcInput; + private final ChecksummedDataInput input; // we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized @Nullable private final RateLimiter rateLimiter; - private HintsReader(HintsDescriptor descriptor, File file, RandomAccessReader reader, RateLimiter rateLimiter) + private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter) { this.descriptor = descriptor; this.file = file; - this.reader = reader; - this.crcInput = ChecksummedDataInput.wrap(reader); + this.input = reader; this.rateLimiter = rateLimiter; } static HintsReader open(File file, RateLimiter rateLimiter) { - RandomAccessReader reader = RandomAccessReader.open(file); + ChecksummedDataInput reader = ChecksummedDataInput.open(file); try { HintsDescriptor descriptor = HintsDescriptor.deserialize(reader); @@ -83,7 +81,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> } catch (IOException e) { - reader.close(); + FileUtils.closeQuietly(reader); throw new FSReadError(e, file); } } @@ -95,7 +93,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> public void close() { - FileUtils.closeQuietly(reader); + FileUtils.closeQuietly(input); } public HintsDescriptor descriptor() @@ -105,7 +103,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> void seek(long newPosition) { - reader.seek(newPosition); + input.seek(newPosition); } public Iterator<Page> iterator() @@ -138,12 +136,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> @SuppressWarnings("resource") protected Page computeNext() { - CLibrary.trySkipCache(reader.getChannel().getFileDescriptor(), 0, reader.getFilePointer(), reader.getPath()); + CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath()); - if (reader.length() == reader.getFilePointer()) + if (input.length() == input.getFilePointer()) return endOfData(); - return new Page(reader.getFilePointer()); + return new Page(input.getFilePointer()); } } @@ -166,9 +164,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> do { - long position = reader.getFilePointer(); + long position = input.getFilePointer(); - if (reader.length() == position) + if (input.length() == position) return endOfData(); // reached EOF if (position - offset >= PAGE_SIZE) @@ -190,13 +188,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> private Hint computeNextInternal() throws IOException { - crcInput.resetCrc(); - crcInput.resetLimit(); + input.resetCrc(); + input.resetLimit(); - int size = crcInput.readInt(); + int size = input.readInt(); // if we cannot corroborate the size via crc, then we cannot safely skip this hint - if (reader.readInt() != crcInput.getCrc()) + if (!input.checkCrc()) throw new IOException("Digest mismatch exception"); return readHint(size); @@ -206,12 +204,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { if (rateLimiter != null) rateLimiter.acquire(size); - crcInput.limit(size); + input.limit(size); Hint hint; try { - hint = Hint.serializer.deserialize(crcInput, descriptor.messagingVersion()); + hint = Hint.serializer.deserialize(input, descriptor.messagingVersion()); + input.checkLimit(0); } catch (UnknownColumnFamilyException e) { @@ -219,18 +218,18 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> descriptor.hostId, e.cfId, descriptor.fileName()); - reader.skipBytes(crcInput.bytesRemaining()); + input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit())); return null; } - if (reader.readInt() == crcInput.getCrc()) + if (input.checkCrc()) return hint; // log a warning and skip the corrupted entry logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}", descriptor.hostId, - crcInput.getPosition() - size - 4, + input.getPosition() - size - 4, descriptor.fileName()); return null; } @@ -255,9 +254,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> do { - long position = reader.getFilePointer(); + long position = input.getFilePointer(); - if (reader.length() == position) + if (input.length() == position) return endOfData(); // reached EOF if (position - offset >= PAGE_SIZE) @@ -279,13 +278,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> private ByteBuffer computeNextInternal() throws IOException { - crcInput.resetCrc(); - crcInput.resetLimit(); + input.resetCrc(); + input.resetLimit(); - int size = crcInput.readInt(); + int size = input.readInt(); // if we cannot corroborate the size via crc, then we cannot safely skip this hint - if (reader.readInt() != crcInput.getCrc()) + if (!input.checkCrc()) throw new IOException("Digest mismatch exception"); return readBuffer(size); @@ -295,16 +294,16 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { if (rateLimiter != null) rateLimiter.acquire(size); - crcInput.limit(size); + input.limit(size); - ByteBuffer buffer = ByteBufferUtil.read(crcInput, size); - if (reader.readInt() == crcInput.getCrc()) + ByteBuffer buffer = ByteBufferUtil.read(input, size); + if (input.checkCrc()) return buffer; // log a warning and skip the corrupted entry logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}", descriptor.hostId, - crcInput.getPosition() - size - 4, + input.getPosition() - size - 4, descriptor.fileName()); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index c38f4d2..0242871 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -19,11 +19,7 @@ package org.apache.cassandra.io.compress; import java.io.*; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.zip.Adler32; import java.util.zip.Checksum; import com.google.common.primitives.Ints; @@ -31,7 +27,6 @@ import com.google.common.primitives.Ints; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.util.*; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.memory.BufferPool; /** @@ -40,18 +35,6 @@ import org.apache.cassandra.utils.memory.BufferPool; */ public class CompressedRandomAccessReader extends RandomAccessReader { - public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata) - { - return new CompressedRandomAccessReader(channel, metadata, null); - } - - public static CompressedRandomAccessReader open(ICompressedFile file) - { - return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file); - } - - private final TreeMap<Long, MappedByteBuffer> chunkSegments; - private final CompressionMetadata metadata; // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer. @@ -63,39 +46,59 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private ByteBuffer checksumBytes; - protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) + protected CompressedRandomAccessReader(Builder builder) { - super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType()); - this.metadata = metadata; - checksum = metadata.checksumType.newInstance(); + super(builder.initializeBuffers(false)); + this.metadata = builder.metadata; + this.checksum = metadata.checksumType.newInstance(); - chunkSegments = file == null ? null : file.chunkSegments(); - if (chunkSegments == null) - { - compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().preferredBufferType()); - checksumBytes = ByteBuffer.wrap(new byte[4]); - } + initializeBuffer(); } - protected int getBufferSize(int size) + @Override + protected int getBufferSize(RandomAccessReader.Builder builder) { - assert Integer.bitCount(size) == 1; //must be a power of two - return size; + // this is the chunk data length, throttling is OK with this + return builder.bufferSize; } @Override - public void close() + protected void initializeBuffer() { - super.close(); + buffer = allocateBuffer(bufferSize); + buffer.limit(0); - if (compressed != null) + if (regions == null) { - BufferPool.put(compressed); - compressed = null; + compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())); + checksumBytes = ByteBuffer.wrap(new byte[4]); } } - private void reBufferStandard() + @Override + protected void releaseBuffer() + { + try + { + if (buffer != null) + { + BufferPool.put(buffer); + buffer = null; + } + } + finally + { + // this will always be null if using mmap access mode (unlike in parent, where buffer is set to a region) + if (compressed != null) + { + BufferPool.put(compressed); + compressed = null; + } + } + } + + @Override + protected void reBufferStandard() { try { @@ -105,13 +108,19 @@ public class CompressedRandomAccessReader extends RandomAccessReader CompressionMetadata.Chunk chunk = metadata.chunkFor(position); if (compressed.capacity() < chunk.length) - compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType()); + { + BufferPool.put(compressed); + compressed = allocateBuffer(chunk.length); + } else + { compressed.clear(); - compressed.limit(chunk.length); + } + compressed.limit(chunk.length); if (channel.read(compressed, chunk.offset) != chunk.length) throw new CorruptBlockException(getPath(), chunk); + compressed.flip(); buffer.clear(); @@ -158,7 +167,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader } } - private void reBufferMmap() + @Override + protected void reBufferMmap() { try { @@ -167,10 +177,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader CompressionMetadata.Chunk chunk = metadata.chunkFor(position); - Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset); - long segmentOffset = entry.getKey(); + MmappedRegions.Region region = regions.floor(chunk.offset); + long segmentOffset = region.bottom(); int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset); - ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java + ByteBuffer compressedChunk = region.buffer.duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length); @@ -218,19 +228,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader } - @Override - protected void reBuffer() - { - if (chunkSegments != null) - { - reBufferMmap(); - } - else - { - reBufferStandard(); - } - } - private int checksum(CompressionMetadata.Chunk chunk) throws IOException { long position = chunk.offset + chunk.length; @@ -240,11 +237,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader return checksumBytes.getInt(0); } - public int getTotalBufferSize() - { - return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity()); - } - @Override public long length() { @@ -256,4 +248,39 @@ public class CompressedRandomAccessReader extends RandomAccessReader { return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength); } + + public final static class Builder extends RandomAccessReader.Builder + { + private final CompressionMetadata metadata; + + public Builder(ICompressedFile file) + { + super(file.channel()); + this.metadata = applyMetadata(file.getMetadata()); + this.regions = file.regions(); + } + + public Builder(ChannelProxy channel, CompressionMetadata metadata) + { + super(channel); + this.metadata = applyMetadata(metadata); + } + + private CompressionMetadata applyMetadata(CompressionMetadata metadata) + { + this.overrideLength = metadata.compressedFileLength; + this.bufferSize = metadata.chunkLength(); + this.bufferType = metadata.compressor().preferredBufferType(); + + assert Integer.bitCount(this.bufferSize) == 1; //must be a power of two + + return metadata; + } + + @Override + public RandomAccessReader build() + { + return new CompressedRandomAccessReader(this); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java deleted file mode 100644 index ea5edaf..0000000 --- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java +++ /dev/null @@ -1,48 +0,0 @@ -package org.apache.cassandra.io.compress; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -import com.google.common.util.concurrent.RateLimiter; - -import org.apache.cassandra.io.util.ChannelProxy; -import org.apache.cassandra.io.util.ICompressedFile; - -public class CompressedThrottledReader extends CompressedRandomAccessReader -{ - private final RateLimiter limiter; - - public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter) - { - super(channel, metadata, file); - this.limiter = limiter; - } - - protected void reBuffer() - { - limiter.acquire(buffer.capacity()); - super.reBuffer(); - } - - public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter) - { - return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index f5d8f7e..1681b0c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -92,7 +92,7 @@ public class CompressionMetadata } @VisibleForTesting - CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType) + public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType) { this.indexFilePath = indexFilePath; this.checksumType = checksumType; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index 124720e..6f1e2f4 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -28,7 +28,6 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CloseableIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 522f7a1..5d8ab50 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -828,8 +828,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (!summaryLoaded) { summaryBuilder.maybeAddEntry(decoratedKey, indexPosition); - ibuilder.addPotentialBoundary(indexPosition); - dbuilder.addPotentialBoundary(indexEntry.position); } } @@ -868,8 +866,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS metadata.params.minIndexInterval, metadata.params.maxIndexInterval); first = decorateKey(ByteBufferUtil.readWithLength(iStream)); last = decorateKey(ByteBufferUtil.readWithLength(iStream)); - ibuilder.deserializeBounds(iStream); - dbuilder.deserializeBounds(iStream); + ibuilder.deserializeBounds(iStream, descriptor.version); + dbuilder.deserializeBounds(iStream, descriptor.version); } catch (IOException e) { @@ -904,39 +902,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (ifile == null) return false; - Iterator<FileDataInput> segments = ifile.iterator(0); int i = 0; int summaryEntriesChecked = 0; int expectedIndexInterval = getMinIndexInterval(); - while (segments.hasNext()) + String path = null; + try (FileDataInput in = ifile.createReader(0)) { - String path = null; - try (FileDataInput in = segments.next()) + path = in.getPath(); + while (!in.isEOF()) { - path = in.getPath(); - while (!in.isEOF()) + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + if (i % expectedIndexInterval == 0) { - ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - if (i % expectedIndexInterval == 0) - { - ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval)); - if (!summaryKey.equals(indexKey)) - return false; - summaryEntriesChecked++; + ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval)); + if (!summaryKey.equals(indexKey)) + return false; + summaryEntriesChecked++; - if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL) - return true; - } - RowIndexEntry.Serializer.skip(in); - i++; + if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL) + return true; } - } - catch (IOException e) - { - markSuspect(); - throw new CorruptSSTableException(e, path); + RowIndexEntry.Serializer.skip(in); + i++; } } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, path); + } return true; } @@ -972,8 +966,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel()); ByteBufferUtil.writeWithLength(first.getKey(), oStream); ByteBufferUtil.writeWithLength(last.getKey(), oStream); - ibuilder.serializeBounds(oStream); - dbuilder.serializeBounds(oStream); + ibuilder.serializeBounds(oStream, descriptor.version); + dbuilder.serializeBounds(oStream, descriptor.version); } catch (IOException e) { @@ -1600,29 +1594,25 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (ifile == null) return null; - Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); - while (segments.hasNext()) + String path = null; + try (FileDataInput in = ifile.createReader(sampledPosition)) { - String path = null; - try (FileDataInput in = segments.next();) + path = in.getPath(); + while (!in.isEOF()) { - path = in.getPath(); - while (!in.isEOF()) - { - ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - DecoratedKey indexDecoratedKey = decorateKey(indexKey); - if (indexDecoratedKey.compareTo(token) > 0) - return indexDecoratedKey; + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + DecoratedKey indexDecoratedKey = decorateKey(indexKey); + if (indexDecoratedKey.compareTo(token) > 0) + return indexDecoratedKey; - RowIndexEntry.Serializer.skip(in); - } - } - catch (IOException e) - { - markSuspect(); - throw new CorruptSSTableException(e, path); + RowIndexEntry.Serializer.skip(in); } } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, path); + } return null; } @@ -1744,11 +1734,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS */ public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift); - - public FileDataInput getFileDataInput(long position) { - return dfile.getSegment(position); + return dfile.createReader(position); } /** @@ -1939,7 +1927,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public RandomAccessReader openDataReader(RateLimiter limiter) { assert limiter != null; - return dfile.createThrottledReader(limiter); + return dfile.createReader(limiter); } public RandomAccessReader openDataReader() @@ -1954,6 +1942,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return null; } + public ChannelProxy getDataChannel() + { + return dfile.channel; + } + + public ChannelProxy getIndexChannel() + { + return ifile.channel; + } + /** * @param component component to get timestamp. * @return last modified time for given component. 0 if given component does not exist or IO error occurs. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/Version.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java index 4fcf055..16829ab 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/Version.java +++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java @@ -68,6 +68,8 @@ public abstract class Version public abstract boolean hasCompactionAncestors(); + public abstract boolean hasBoundaries(); + public String getVersion() { return version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index d65710e..cbc2c39 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -134,6 +134,7 @@ public class BigFormat implements SSTableFormat private final boolean newFileName; public final boolean storeRows; public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far + public final boolean hasBoundaries; /** * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped) * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution. @@ -176,6 +177,8 @@ public class BigFormat implements SSTableFormat correspondingMessagingVersion = storeRows ? MessagingService.VERSION_30 : MessagingService.VERSION_21; + + hasBoundaries = version.compareTo("ma") < 0; } @Override @@ -251,6 +254,12 @@ public class BigFormat implements SSTableFormat } @Override + public boolean hasBoundaries() + { + return hasBoundaries; + } + + @Override public boolean isCompatible() { return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 87608fd..4b66942 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -178,78 +178,74 @@ public class BigTableReader extends SSTableReader // is lesser than the first key of next interval (and in that case we must return the position of the first key // of the next interval). int i = 0; - Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); - while (segments.hasNext()) + String path = null; + try (FileDataInput in = ifile.createReader(sampledPosition)) { - String path = null; - try (FileDataInput in = segments.next()) + path = in.getPath(); + while (!in.isEOF()) { - path = in.getPath(); - while (!in.isEOF()) - { - i++; + i++; - ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - boolean opSatisfied; // did we find an appropriate position for the op requested - boolean exactMatch; // is the current position an exact match for the key, suitable for caching + boolean opSatisfied; // did we find an appropriate position for the op requested + boolean exactMatch; // is the current position an exact match for the key, suitable for caching - // Compare raw keys if possible for performance, otherwise compare decorated keys. - if (op == Operator.EQ && i <= effectiveInterval) - { - opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); - } - else + // Compare raw keys if possible for performance, otherwise compare decorated keys. + if (op == Operator.EQ && i <= effectiveInterval) + { + opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); + } + else + { + DecoratedKey indexDecoratedKey = decorateKey(indexKey); + int comparison = indexDecoratedKey.compareTo(key); + int v = op.apply(comparison); + opSatisfied = (v == 0); + exactMatch = (comparison == 0); + if (v < 0) { - DecoratedKey indexDecoratedKey = decorateKey(indexKey); - int comparison = indexDecoratedKey.compareTo(key); - int v = op.apply(comparison); - opSatisfied = (v == 0); - exactMatch = (comparison == 0); - if (v < 0) - { - Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); - return null; - } + Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); + return null; } + } - if (opSatisfied) + if (opSatisfied) + { + // read data position from index entry + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in); + if (exactMatch && updateCacheAndStats) { - // read data position from index entry - RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in); - if (exactMatch && updateCacheAndStats) - { - assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key - DecoratedKey decoratedKey = (DecoratedKey)key; + assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key + DecoratedKey decoratedKey = (DecoratedKey)key; - if (logger.isTraceEnabled()) + if (logger.isTraceEnabled()) + { + // expensive sanity check! see CASSANDRA-4687 + try (FileDataInput fdi = dfile.createReader(indexEntry.position)) { - // expensive sanity check! see CASSANDRA-4687 - try (FileDataInput fdi = dfile.getSegment(indexEntry.position)) - { - DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi)); - if (!keyInDisk.equals(key)) - throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); - } + DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + if (!keyInDisk.equals(key)) + throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); } - - // store exact match for the key - cacheKey(decoratedKey, indexEntry); } - if (op == Operator.EQ && updateCacheAndStats) - bloomFilterTracker.addTruePositive(); - Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); - return indexEntry; - } - RowIndexEntry.Serializer.skip(in); + // store exact match for the key + cacheKey(decoratedKey, indexEntry); + } + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addTruePositive(); + Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); + return indexEntry; } + + RowIndexEntry.Serializer.skip(in); } - catch (IOException e) - { - markSuspect(); - throw new CorruptSSTableException(e, path); - } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, path); } if (op == SSTableReader.Operator.EQ && updateCacheAndStats) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 77bf3d6..38dab9a 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -121,7 +121,6 @@ public class BigTableWriter extends SSTableWriter if (logger.isTraceEnabled()) logger.trace("wrote {} at {}", decoratedKey, dataEnd); iwriter.append(decoratedKey, index, dataEnd); - dbuilder.addPotentialBoundary(dataEnd); } /** @@ -420,7 +419,6 @@ public class BigTableWriter extends SSTableWriter logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); - builder.addPotentialBoundary(indexStart); } /**
