Merge branch 'cassandra-3.0' into cassandra-3.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8458e4e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8458e4e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8458e4e3 Branch: refs/heads/cassandra-3.8 Commit: 8458e4e30744e8df6fcbe12aec286db047411385 Parents: ee60941 62ef861 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Aug 9 16:56:29 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Aug 9 16:56:29 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Config.java | 4 ++ .../cassandra/config/DatabaseDescriptor.java | 5 -- .../cassandra/streaming/StreamReader.java | 26 +--------- .../cassandra/streaming/StreamSession.java | 36 +------------- .../compress/CompressedInputStream.java | 21 +++++++- .../compress/CompressedStreamReader.java | 10 ++-- .../streaming/messages/IncomingFileMessage.java | 22 ++------- .../streaming/messages/RetryMessage.java | 4 ++ .../org/apache/cassandra/utils/Throwables.java | 14 ++++++ .../compression/CompressedInputStreamTest.java | 52 +++++++++++++++++--- 11 files changed, 100 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b7bbf72,f613c5f..8fbd774 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -22,47 -15,6 +22,48 @@@ Merged from 3.0 to connect with too low of a protocol version (CASSANDRA-11464) * NullPointerExpception when reading/compacting table (CASSANDRA-11988) * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144) +Merged from 2.2: ++ * Fix hanging stream session (CASSANDRA-10992) + * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371) + * Release sstables of failed stream sessions only when outgoing transfers are finished (CASSANDRA-11345) + * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465) + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979) + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) +Merged from 2.1: + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) + * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040) + + +3.8 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236) + * Fix hdr logging for single operation workloads (CASSANDRA-12145) + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073) + * Increase size of flushExecutor thread pool (CASSANDRA-12071) + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950) + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034) + * Improve details in compaction log message (CASSANDRA-12080) + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911) + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993) + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579) + * Move skip_stop_words filter before stemming (CASSANDRA-12078) + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957) + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002) + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966) + * Add cross-DC latency metrics (CASSANDRA-11596) + * Allow terms in selection clause (CASSANDRA-10783) + * Add bind variables to trace (CASSANDRA-11719) + * Switch counter shards' clock to timestamps (CASSANDRA-9811) + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853) + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718) + * Support older ant versions (CASSANDRA-11807) + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623) + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546) + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578) + * Faster streaming (CASSANDRA-9766) + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425) + * Add repaired percentage metric (CASSANDRA-11503) + * Add Change-Data-Capture (CASSANDRA-8844) +Merged from 3.0: * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107) * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393) * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 0cd6329,4ca7937..816f028 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -154,32 -153,12 +151,13 @@@ public class StreamReade { Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) - throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); - desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format)); + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); - return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId)); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), getHeader(cfs.metadata)); + StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum); + return writer; } - protected void drain(InputStream dis, long bytesRead) throws IOException - { - long toSkip = totalSize() - bytesRead; - - // InputStream.skip can return -1 if dis is inaccessible. - long skipped = dis.skip(toSkip); - if (skipped == -1) - return; - - toSkip = toSkip - skipped; - while (toSkip > 0) - { - skipped = dis.skip(toSkip); - if (skipped == -1) - break; - toSkip = toSkip - skipped; - } - } - protected long totalSize() { long size = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 3aaa1a3,d59849f..81abefa --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@@ -79,31 -90,32 +88,37 @@@ public class CompressedInputStream exte // buffer is limited to store up to 1024 chunks this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024)); this.crcCheckChanceSupplier = crcCheckChanceSupplier; + this.checksumType = checksumType; - new Thread(new Reader(source, info, dataBuffer)).start(); + new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start(); } - public int read() throws IOException + private void decompressNextChunk() throws IOException { + if (readException != null) + throw readException; + - if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + try { - try + byte[] compressedWithCRC = dataBuffer.take(); + if (compressedWithCRC == POISON_PILL) - throw new EOFException("No chunk available"); + { - byte[] compressedWithCRC = dataBuffer.take(); - if (compressedWithCRC == POISON_PILL) - { - assert readException != null; - throw readException; - } - decompress(compressedWithCRC); - } - catch (InterruptedException e) - { - throw new EOFException("No chunk available"); ++ assert readException != null; ++ throw readException; + } + decompress(compressedWithCRC); + } + catch (InterruptedException e) + { + throw new EOFException("No chunk available"); } + } + + @Override + public int read() throws IOException + { + if (current >= bufferOffset + buffer.length || validBufferBytes == -1) + decompressNextChunk(); assert current >= bufferOffset && current < bufferOffset + validBufferBytes; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 8dafa9c,bc87c8f..70b5765 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -35,9 -36,11 +35,11 @@@ import org.apache.cassandra.streaming.P import org.apache.cassandra.streaming.StreamReader; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.FileMessageHeader; -import org.apache.cassandra.io.util.TrackedInputStream; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; + import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + /** * StreamReader that reads from streamed compressed SSTable */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java ----------------------------------------------------------------------