Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3 Branch: refs/heads/cassandra-3.0 Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed Parents: 0ad0de1 a549bd0 Author: Yuki Morishita <[email protected]> Authored: Tue Nov 3 10:03:34 2015 -0600 Committer: Yuki Morishita <[email protected]> Committed: Tue Nov 3 10:03:34 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/streaming/StreamReader.java | 12 ++++++++---- .../streaming/compress/CompressedStreamReader.java | 11 ++++++++--- 3 files changed, 17 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1724f01,5c23acf..e0208c6 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,26 -1,12 +1,27 @@@ -2.2.4 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581) +3.0 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602) + * Don't use 'names query' read path for counters (CASSANDRA-10572) + * Fix backward compatibility for counters (CASSANDRA-10470) + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581) + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604) + * Fix thrift cas operations with defined columns (CASSANDRA-10576) + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606) + * Fix thrift get() queries with defined columns (CASSANDRA-10586) + * Fix marking of indexes as built and removed (CASSANDRA-10601) + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595) + * Fix batches on multiple tables (CASSANDRA-10554) + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569) + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) + * Remove token generator (CASSANDRA-5261) + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562) + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360) + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367) +Merged from 2.2: * Expose phi values from failure detector via JMX and tweak debug and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) Merged from 2.1: + * Fix streaming to catch exception so retry not fail (CASSANDRA-10557) * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092) * Support encrypted and plain traffic on the same port (CASSANDRA-10559) * Do STCS in DTCS windows (CASSANDRA-10276) http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 879491e,1ccebb0..6169494 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -97,25 -94,34 +97,29 @@@ public class StreamReade } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format); - DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); BytesReadTracker in = new BytesReadTracker(dis); - SSTableWriter writer = null; + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); ++ SSTableMultiWriter writer = null; try { + writer = createWriter(cfs, totalSize, repairedAt, format); while (in.getBytesRead() < totalSize) { - writeRow(writer, in, cfs); - + writePartition(deserializer, writer, cfs); // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } return writer; - } catch (Throwable e) + } + catch (Throwable e) { - SSTableMultiWriter.abortOrDie(writer); - + if (writer != null) + { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } ++ Throwable e2 = writer.abort(null); ++ // add abort error to original and continue so we can drain unread stream ++ e.addSuppressed(e2); + } drain(dis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e; http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 30cafef,facb906..fca6aa7 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format); - - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, + inputVersion.compressedChecksumType(), cfs::getCrcCheckChance); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); - SSTableWriter writer = null; + StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata)); ++ SSTableMultiWriter writer = null; try { + writer = createWriter(cfs, totalSize, repairedAt, format); for (Pair<Long, Long> section : sections) { assert cis.getTotalCompressedBytesRead() <= totalSize; @@@ -102,7 -102,18 +102,12 @@@ } catch (Throwable e) { - SSTableMultiWriter.abortOrDie(writer); + if (writer != null) + { - try - { - writer.abort(); - } - catch (Throwable e2) - { - // add abort error to original and continue so we can drain unread stream - e.addSuppressed(e2); - } ++ Throwable e2 = writer.abort(null); ++ // add abort error to original and continue so we can drain unread stream ++ e.addSuppressed(e2); + } drain(cis, in.getBytesRead()); if (e instanceof IOException) throw (IOException) e;
