Updated Branches:
refs/heads/cassandra-1.2.0 08cc800ba -> 620a4b7a4
Merge branch 'cassandra-1.1' into cassandra-1.2.0
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/io/compress/CompressionParameters.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/620a4b7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/620a4b7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/620a4b7a
Branch: refs/heads/cassandra-1.2.0
Commit: 620a4b7a4e86e1f31f9823e12a8d5473980c7f03
Parents: 08cc800 1a66ee9
Author: Aleksey Yeschenko <[email protected]>
Authored: Wed Dec 19 16:07:39 2012 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Wed Dec 19 16:07:39 2012 +0300
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 14 ++++
.../cassandra/db/ColumnFamilyStoreMBean.java | 5 ++
.../io/compress/CompressedRandomAccessReader.java | 2 +-
.../io/compress/CompressionParameters.java | 54 ++++++++++++--
.../streaming/compress/CompressedInputStream.java | 2 +-
6 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7a17236,38d9d47..6edf7bb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,142 -1,29 +1,143 @@@
-1.1.9
+1.2.0-rc2
+ * fix nodetool ownership display with vnodes (CASSANDRA-5065)
+ * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
+ * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
+ * Fix SimpleAuthorizer example (CASSANDRA-5072)
+ * cqlsh: force CL.ONE for tracing and system.schema* queries (CASSANDRA-5070)
+ * Includes cassandra-shuffle in the debian package (CASSANDRA-5058)
+Merged from 1.1:
* fix multithreaded compaction deadlock (CASSANDRA-4492)
+ * fix temporarily missing schema after upgrade from pre-1.1.5
(CASSANDRA-5061)
+ * Fix ALTER TABLE overriding compression options with defaults
- (CASSANDRA-4996, CASSANDRA-5066)
++ (CASSANDRA-4996, 5066)
+ * fix specifying and altering crc_check_chance (CASSANDRA-5053)
-1.1.8
- * reset getRangeSlice filter after finishing a row for get_paged_slice
- (CASSANDRA-4919)
- * fix temporarily missing schema after upgrade from pre-1.1.5
(CASSANDRA-5061)
+1.2-rc1
+ * rename rpc_timeout settings to request_timeout (CASSANDRA-5027)
+ * add BF with 0.1 FP to LCS by default (CASSANDRA-5029)
+ * Fix preparing insert queries (CASSANDRA-5016)
+ * Fix preparing queries with counter increment (CASSANDRA-5022)
+ * Fix preparing updates with collections (CASSANDRA-5017)
+ * Don't generate UUID based on other node address (CASSANDRA-5002)
+ * Fix message when trying to alter a clustering key type (CASSANDRA-5012)
+ * Update IAuthenticator to match the new IAuthorizer (CASSANDRA-5003)
+ * Fix inserting only a key in CQL3 (CASSANDRA-5040)
+ * Fix CQL3 token() function when used with strings (CASSANDRA-5050)
+Merged from 1.1:
* reduce log spam from invalid counter shards (CASSANDRA-5026)
* Improve schema propagation performance (CASSANDRA-5025)
- * Fall back to old describe_splits if d_s_ex is not available
(CASSANDRA-4803)
- * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
+ * Fix for IndexHelper.IndexFor throws OOB Exception (CASSANDRA-5030)
+ * cqlsh: make it possible to describe thrift CFs (CASSANDRA-4827)
* cqlsh: fix timestamp formatting on some platforms (CASSANDRA-5046)
- * Fix ALTER TABLE overriding compression options with defaults
(CASSANDRA-4996, 5066)
- * Avoid error opening data file on startup (CASSANDRA-4984)
- * Fix wrong index_options in cli 'show schema' (CASSANDRA-5008)
- * Allow overriding number of available processor (CASSANDRA-4790)
-1.1.7
- * cqlsh: improve COPY FROM performance (CASSANDRA-4921)
+1.2-beta3
+ * make consistency level configurable in cqlsh (CASSANDRA-4829)
+ * fix cqlsh rendering of blob fields (CASSANDRA-4970)
+ * fix cqlsh DESCRIBE command (CASSANDRA-4913)
+ * save truncation position in system table (CASSANDRA-4906)
+ * Move CompressionMetadata off-heap (CASSANDRA-4937)
+ * allow CLI to GET cql3 columnfamily data (CASSANDRA-4924)
+ * Fix rare race condition in getExpireTimeForEndpoint (CASSANDRA-4402)
+ * acquire references to overlapping sstables during compaction so bloom
filter
+ doesn't get free'd prematurely (CASSANDRA-4934)
+ * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928)
+ * Separate tracing from Log4J (CASSANDRA-4861)
+ * Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
+ * Better printing of AbstractBounds for tracing (CASSANDRA-4931)
+ * Optimize mostRecentTombstone check in CC.collectAllData (CASSANDRA-4883)
+ * Change stream session ID to UUID to avoid collision from same node
(CASSANDRA-4813)
+ * Use Stats.db when bulk loading if present (CASSANDRA-4957)
+ * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
+ * (cql3) Remove arbitrary SELECT limit (CASSANDRA-4918)
+ * Correctly handle prepared operation on collections (CASSANDRA-4945)
+ * Fix CQL3 LIMIT (CASSANDRA-4877)
+ * Fix Stress for CQL3 (CASSANDRA-4979)
+ * Remove cassandra specific exceptions from JMX interface (CASSANDRA-4893)
+ * (CQL3) Force using ALLOW FILTERING on potentially inefficient queries
(CASSANDRA-4915)
+ * (cql3) Fix adding column when the table has collections (CASSANDRA-4982)
+ * (cql3) Fix allowing collections with compact storage (CASSANDRA-4990)
+ * (cql3) Refuse ttl/writetime function on collections (CASSANDRA-4992)
+ * Replace IAuthority with new IAuthorizer (CASSANDRA-4874)
+ * clqsh: fix KEY pseudocolumn escaping when describing Thrift tables
+ in CQL3 mode (CASSANDRA-4955)
* add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
* fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
+Merged from 1.1:
+ * Fall back to old describe_splits if d_s_ex is not available
(CASSANDRA-4803)
+ * Improve error reporting when streaming ranges fail (CASSANDRA-5009)
+ * Fix cqlsh timestamp formatting of timezone info (CASSANDRA-4746)
+ * Fix assertion failure with leveled compaction (CASSANDRA-4799)
+ * Check for null end_token in get_range_slice (CASSANDRA-4804)
+ * Remove all remnants of removed nodes (CASSANDRA-4840)
+ * Add aut-reloading of the log4j file in debian package (CASSANDRA-4855)
+ * Fix estimated row cache entry size (CASSANDRA-4860)
+ * reset getRangeSlice filter after finishing a row for get_paged_slice
+ (CASSANDRA-4919)
* expunge row cache post-truncate (CASSANDRA-4940)
- * remove IAuthority2 (CASSANDRA-4875)
+ * Allow static CF definition with compact storage (CASSANDRA-4910)
+ * Fix endless loop/compaction of schema_* CFs due to broken timestamps
(CASSANDRA-4880)
+ * Fix 'wrong class type' assertion in CounterColumn (CASSANDRA-4976)
+
+
+1.2-beta2
+ * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876)
+ * off-heap bloom filters for row keys (CASSANDRA_4865)
+ * add extension point for sstable components (CASSANDRA-4049)
+ * improve tracing output (CASSANDRA-4852, 4862)
+ * make TRACE verb droppable (CASSANDRA-4672)
+ * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
+ * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)
+ * Make hint delivery asynchronous (CASSANDRA-4761)
+ * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609,
4610)
+ * cassandra-cli: allow Double value type to be inserted to a column
(CASSANDRA-4661)
+ * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
+ * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
+ * include metadata for system keyspace itself in schema tables
(CASSANDRA-4416)
+ * add check to PropertyFileSnitch to verify presence of location for
+ local node (CASSANDRA-4728)
+ * add PBSPredictor consistency modeler (CASSANDRA-4261)
+ * remove vestiges of Thrift unframed mode (CASSANDRA-4729)
+ * optimize single-row PK lookups (CASSANDRA-4710)
+ * adjust blockFor calculation to account for pending ranges due to node
+ movement (CASSANDRA-833)
+ * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
+ * (CQL3) Make prepared statement global instead of per connection
+ (CASSANDRA-4449)
+ * Fix scrubbing of CQL3 created tables (CASSANDRA-4685)
+ * (CQL3) Fix validation when using counter and regular columns in the same
+ table (CASSANDRA-4706)
+ * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648)
+ * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738)
+ * Add support for multiple column family outputs in CFOF (CASSANDRA-4208)
+ * Support repairing only the local DC nodes (CASSANDRA-4747)
+ * Use rpc_address for binary protocol and change default port (CASSANRA-4751)
+ * Fix use of collections in prepared statements (CASSANDRA-4739)
+ * Store more information into peers table (CASSANDRA-4351, 4814)
+ * Configurable bucket size for size tiered compaction (CASSANDRA-4704)
+ * Run leveled compaction in parallel (CASSANDRA-4310)
+ * Fix potential NPE during CFS reload (CASSANDRA-4786)
+ * Composite indexes may miss results (CASSANDRA-4796)
+ * Move consistency level to the protocol level (CASSANDRA-4734, 4824)
+ * Fix Subcolumn slice ends not respected (CASSANDRA-4826)
+ * Fix Assertion error in cql3 select (CASSANDRA-4783)
+ * Fix list prepend logic (CQL3) (CASSANDRA-4835)
+ * Add booleans as literals in CQL3 (CASSANDRA-4776)
+ * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
+ * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
+ * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
+ * Remove system tables accounting from schema (CASSANDRA-4850)
+ * (cql3) Force provided columns in clustering key order in
+ 'CLUSTERING ORDER BY' (CASSANDRA-4881)
+ * Fix composite index bug (CASSANDRA-4884)
+ * Fix short read protection for CQL3 (CASSANDRA-4882)
+ * Add tracing support to the binary protocol (CASSANDRA-4699)
+ * (cql3) Don't allow prepared marker inside collections (CASSANDRA-4890)
+ * Re-allow order by on non-selected columns (CASSANDRA-4645)
+ * Bug when composite index is created in a table having collections
(CASSANDRA-4909)
+ * log index scan subject in CompositesSearcher (CASSANDRA-4904)
+Merged from 1.1:
* add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
* fix indexing empty column values (CASSANDRA-4832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 364565f,8284d38..ef3e14a
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -204,18 -196,18 +204,32 @@@ public class ColumnFamilyStore implemen
return metadata.compressionParameters().asThriftOptions();
}
- public void setCompressionParameters(Map<String,String> opts) throws
ConfigurationException
+ public void setCompressionParameters(Map<String,String> opts)
{
- metadata.compressionParameters = CompressionParameters.create(opts);
+ try
+ {
+ metadata.compressionParameters =
CompressionParameters.create(opts);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new IllegalArgumentException(e.getMessage());
+ }
}
- public void setCrcCheckChance(double crcCheckChance) throws
ConfigurationException
++ public void setCrcCheckChance(double crcCheckChance)
+ {
- for (SSTableReader sstable : table.getAllSSTables())
- if (sstable.compression)
-
sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance);
++ try
++ {
++ for (SSTableReader sstable : table.getAllSSTables())
++ if (sstable.compression)
++
sstable.getCompressionMetadata().parameters.setCrcCheckChance(crcCheckChance);
++ }
++ catch (ConfigurationException e)
++ {
++ throw new IllegalArgumentException(e.getMessage());
++ }
+ }
+
private ColumnFamilyStore(Table table,
String columnFamilyName,
IPartitioner partitioner,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 3388b9b,26da8be..0a6b077
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@@ -274,9 -212,14 +274,14 @@@ public interface ColumnFamilyStoreMBea
* Set the compression parameters
* @param opts map of string names to values
*/
- public void setCompressionParameters(Map<String,String> opts) throws
ConfigurationException;
+ public void setCompressionParameters(Map<String,String> opts);
/**
+ * Set new crc check chance
+ */
- public void setCrcCheckChance(double crcCheckChance) throws
ConfigurationException;
++ public void setCrcCheckChance(double crcCheckChance);
+
+ /**
* Disable automatic compaction.
*/
public void disableAutoCompaction();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index da35e92,3d3b95b..92812a6
---
a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++
b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@@ -112,18 -86,11 +112,18 @@@ public class CompressedRandomAccessRead
compressed = new byte[chunk.length];
if (source.read(compressed, 0, chunk.length) != chunk.length)
- throw new IOException(String.format("(%s) failed to read %d bytes
from offset %d.", getPath(), chunk.length, chunk.offset));
+ throw new CorruptBlockException(getPath(), chunk);
- validBufferBytes = metadata.compressor().uncompress(compressed, 0,
chunk.length, buffer, 0);
+ try
+ {
+ validBufferBytes = metadata.compressor().uncompress(compressed,
0, chunk.length, buffer, 0);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptBlockException(getPath(), chunk);
+ }
- if (metadata.parameters.crcChance >
FBUtilities.threadLocalRandom().nextDouble())
+ if (metadata.parameters.getCrcCheckChance() >
FBUtilities.threadLocalRandom().nextDouble())
{
checksum.update(buffer, 0, validBufferBytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionParameters.java
index 37cdcc4,05cc707..6448d84
--- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java
@@@ -23,16 -21,19 +23,19 @@@ import java.io.IOException
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
++import java.util.AbstractSet;
import java.util.HashMap;
import java.util.Map;
-import java.util.AbstractSet;
import java.util.Set;
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
public class CompressionParameters
{
@@@ -76,9 -84,41 +85,39 @@@
this.sstableCompressor = sstableCompressor;
this.chunkLength = chunkLength;
this.otherOptions = otherOptions;
- this.crcCheckChance = otherOptions.get(CRC_CHECK_CHANCE) == null
- ? DEFAULT_CRC_CHECK_CHANCE
- :
parseCrcCheckChance(otherOptions.get(CRC_CHECK_CHANCE));
+ String chance = otherOptions.get(CRC_CHECK_CHANCE);
- otherOptions.remove(CRC_CHECK_CHANCE);
- this.crcChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE :
Double.parseDouble(chance);
++ this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE :
parseCrcCheckChance(chance);
+ }
+
+ public void setCrcCheckChance(double crcCheckChance) throws
ConfigurationException
+ {
+ validateCrcCheckChance(crcCheckChance);
- logger.debug("Setting crcCheckChance to {}", crcCheckChance);
+ this.crcCheckChance = crcCheckChance;
+ }
+
+ public double getCrcCheckChance()
+ {
+ return this.crcCheckChance;
+ }
+
+ private static double parseCrcCheckChance(String crcCheckChance) throws
ConfigurationException
+ {
+ try
+ {
+ double chance = Double.parseDouble(crcCheckChance);
+ validateCrcCheckChance(chance);
+ return chance;
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("crc_check_chance should be a
double");
+ }
+ }
+
+ private static void validateCrcCheckChance(double crcCheckChance) throws
ConfigurationException
+ {
+ if (crcCheckChance < 0.0d || crcCheckChance > 1.0d)
+ throw new ConfigurationException("crc_check_chance should be
between 0.0 and 1.0");
}
public int chunkLength()
@@@ -116,7 -156,7 +155,7 @@@
Method method = compressorClass.getMethod("create", Map.class);
ICompressor compressor = (ICompressor)method.invoke(null,
compressionOptions);
// Check for unknown options
- Set<String> supportedOpts = compressor.supportedOptions();
- AbstractSet<String> supportedOpts =
Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS);
++ AbstractSet<String> supportedOpts =
Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS);
for (String provided : compressionOptions.keySet())
if (!supportedOpts.contains(provided))
throw new ConfigurationException("Unknown compression
options " + provided);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/620a4b7a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 8eadee8,0000000..c226cb6
mode 100644,000000..100644
---
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -1,156 -1,0 +1,156 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.concurrent.*;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * InputStream which reads data from underlining source with given {@link
CompressionInfo}.
+ */
+public class CompressedInputStream extends InputStream
+{
+ private final CompressionInfo info;
+ // chunk buffer
+ private final BlockingQueue<byte[]> dataBuffer;
+
+ // uncompressed bytes
+ private byte[] buffer;
+
+ // offset from the beginning of the buffer
+ protected long bufferOffset = 0;
+ // current position in stream
+ private long current = 0;
+ // number of bytes in the buffer that are actually valid
+ protected int validBufferBytes = -1;
+
+ private final Checksum checksum = new CRC32();
+
+ // raw checksum bytes
+ private final byte[] checksumBytes = new byte[4];
+
+ private long uncompressedBytes;
+
+ /**
+ * @param source Input source to read compressed data from
+ * @param info Compression info
+ */
+ public CompressedInputStream(InputStream source, CompressionInfo info)
+ {
+ this.info = info;
+ this.buffer = new byte[info.parameters.chunkLength()];
+ // buffer is limited to store up to 1024 chunks
+ this.dataBuffer = new
ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+
+ new Thread(new Reader(source, info, dataBuffer)).start();
+ }
+
+ public int read() throws IOException
+ {
+ if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ {
+ try
+ {
+ decompress(dataBuffer.take());
+ }
+ catch (InterruptedException e)
+ {
+ throw new EOFException("No chunk available");
+ }
+ }
+
+ assert current >= bufferOffset && current < bufferOffset +
validBufferBytes;
+
+ return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
+ }
+
+ public void position(long position)
+ {
+ assert position >= current : "stream can only read forward.";
+ current = position;
+ }
+
+ private void decompress(byte[] compressed) throws IOException
+ {
+ // uncompress
+ validBufferBytes =
info.parameters.sstableCompressor.uncompress(compressed, 0, compressed.length -
checksumBytes.length, buffer, 0);
+ uncompressedBytes += validBufferBytes;
+
+ // validate crc randomly
- if (info.parameters.crcChance >
FBUtilities.threadLocalRandom().nextDouble())
++ if (info.parameters.getCrcCheckChance() >
FBUtilities.threadLocalRandom().nextDouble())
+ {
+ checksum.update(buffer, 0, validBufferBytes);
+
+ System.arraycopy(compressed, compressed.length -
checksumBytes.length, checksumBytes, 0, checksumBytes.length);
+ if (Ints.fromByteArray(checksumBytes) != (int)
checksum.getValue())
+ throw new IOException("CRC unmatched");
+
+ // reset checksum object back to the original (blank) state
+ checksum.reset();
+ }
+
+ // buffer offset is always aligned
+ bufferOffset = current & ~(buffer.length - 1);
+ }
+
+ public long uncompressedBytes()
+ {
+ return uncompressedBytes;
+ }
+
+ static class Reader extends WrappedRunnable
+ {
+ private final InputStream source;
+ private final Iterator<CompressionMetadata.Chunk> chunks;
+ private final BlockingQueue<byte[]> dataBuffer;
+
+ Reader(InputStream source, CompressionInfo info,
BlockingQueue<byte[]> dataBuffer)
+ {
+ this.source = source;
+ this.chunks = Iterators.forArray(info.chunks);
+ this.dataBuffer = dataBuffer;
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ byte[] compressedWithCRC;
+ while (chunks.hasNext())
+ {
+ CompressionMetadata.Chunk chunk = chunks.next();
+
+ int readLength = chunk.length + 4; // read with CRC
+ compressedWithCRC = new byte[readLength];
+
+ int bufferRead = 0;
+ while (bufferRead < readLength)
+ bufferRead += source.read(compressedWithCRC, bufferRead,
readLength - bufferRead);
+ dataBuffer.put(compressedWithCRC);
+ }
+ }
+ }
+}