Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3 Branch: refs/heads/cassandra-3.1 Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630 Parents: 1b81ad1 5ba69a3 Author: Yuki Morishita <yu...@apache.org> Authored: Tue Dec 1 13:05:36 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Dec 1 13:05:36 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReceiveTask.java | 105 ++++++++++--------- 2 files changed, 59 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index af1a186,3ce2da6..7541212 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,22 -1,5 +1,23 @@@ -2.1.12 +2.2.4 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225) + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) + * Reject index queries while the index is building (CASSANDRA-8505) + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) + * Fix JSON update with prepared statements (CASSANDRA-10631) + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) +Merged from 2.1: + * Add proper error handling to stream receiver (CASSANDRA-10774) * Warn or fail when changing cluster topology live (CASSANDRA-10243) * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213) * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 846524b,8773cab..dd56b8b --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; + import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends public void run() { - Pair<String, String> kscf = Schema.instance.getCF(task.cfId); - if (kscf == null) + try { - // schema was dropped during streaming + Pair<String, String> kscf = Schema.instance.getCF(task.cfId); + if (kscf == null) + { + // schema was dropped during streaming + for (SSTableWriter writer : task.sstables) + writer.abort(); + task.sstables.clear(); + task.session.taskCompleted(task); + return; + } + ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L); + if (lockfiledir == null) + throw new IOError(new IOException("All disks full")); + StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); + lockfile.create(task.sstables); + List<SSTableReader> readers = new ArrayList<>(); for (SSTableWriter writer : task.sstables) - writer.abort(); - readers.add(writer.closeAndOpenReader()); ++ readers.add(writer.finish(true)); + lockfile.delete(); task.sstables.clear(); - return; - } - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - - File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L); - if (lockfiledir == null) - throw new IOError(new IOException("All disks full")); - StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); - lockfile.create(task.sstables); - List<SSTableReader> readers = new ArrayList<>(); - for (SSTableWriter writer : task.sstables) - readers.add(writer.finish(true)); - lockfile.delete(); - task.sstables.clear(); - - try (Refs<SSTableReader> refs = Refs.ref(readers)) - { - // add sstables and build secondary indexes - cfs.addSSTables(readers); - cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) + try (Refs<SSTableReader> refs = Refs.ref(readers)) { - List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); - for (SSTableReader sstable : readers) - boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())); - Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); - if (cfs.isRowCacheEnabled()) + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); - } - - if (cfs.metadata.isCounter()) - { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); + for (SSTableReader sstable : readers) + boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())); + Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getColumnFamilyName()); + } + + if (cfs.metadata.isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", task.session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getColumnFamilyName()); + } } } - } - task.session.taskCompleted(task); + task.session.taskCompleted(task); + } + catch (Throwable t) + { + logger.error("Error applying streamed data: ", t); + JVMStabilityInspector.inspectThrowable(t); + task.session.onError(t); + } } }