Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/716f02e9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/716f02e9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/716f02e9 Branch: refs/heads/trunk Commit: 716f02e9ae71d9f3f55260042f1dbdbb26dea800 Parents: 85cc390 118bea5 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu May 26 08:20:25 2016 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu May 26 08:23:05 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../repair/RepairMessageVerbHandler.java | 11 +-- .../apache/cassandra/repair/RepairRunnable.java | 2 +- .../cassandra/service/ActiveRepairService.java | 93 ++++++++++++++++++-- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 3 +- .../service/ActiveRepairServiceTest.java | 2 +- 7 files changed, 97 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ddfb24f,190c2fa..062bb7b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,43 -1,9 +1,44 @@@ -2.2.7 +3.0.7 + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530) + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705) + * Allow compaction strategies to disable early open (CASSANDRA-11754) + * Refactor Materialized View code (CASSANDRA-11475) + * Update Java Driver (CASSANDRA-11615) +Merged from 2.2: * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) * Possible memory leak in NIODataInputStream (CASSANDRA-11867) - * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) - * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395) +Merged from 2.1: ++ * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) + * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840) + * Do not consider local node a valid source during replace (CASSANDRA-11848) + * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) + * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) + + +3.0.6 + * Disallow creating view with a static column (CASSANDRA-11602) + * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593) + * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618) + * Fix queries with filtering on counter columns (CASSANDRA-11629) + * Improve tombstone printing in sstabledump (CASSANDRA-11655) + * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669) + * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600) + * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654) + * Ignore all LocalStrategy keyspaces for streaming and other related + operations (CASSANDRA-11627) + * Ensure columnfilter covers indexed columns for thrift 2i queries (CASSANDRA-11523) + * Only open one sstable scanner per sstable (CASSANDRA-11412) + * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410) + * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485) + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) + * Notify indexers of expired rows during compaction (CASSANDRA-11329) + * Properly respond with ProtocolError when a v1/v2 native protocol + header is received (CASSANDRA-11464) + * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) +Merged from 2.2: + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) * Exit JVM if JMX server fails to startup (CASSANDRA-11540) * Produce a heap dump when exiting on OOM (CASSANDRA-9861) http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index eeefec4,6e7922f..c536b13 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -71,12 -75,18 +71,13 @@@ public class RepairMessageVerbHandler i } columnFamilyStores.add(columnFamilyStore); } - CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from); - // note that we default isGlobal to true since old version always default to true: - boolean isGlobal = peerVersion == null || - peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 || - message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE); - logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion); ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, - columnFamilyStores, - prepareMessage.ranges, - prepareMessage.isIncremental, - prepareMessage.timestamp, - prepareMessage.isGlobal); + message.from, + columnFamilyStores, + prepareMessage.ranges, + prepareMessage.isIncremental, - isGlobal); ++ prepareMessage.timestamp, ++ prepareMessage.isGlobal); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 5aac886,5d010f9..9f249e4 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -246,10 -253,9 +253,10 @@@ public class ActiveRepairService implem return neighbors; } - public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) + public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { - registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal()); + long timestamp = System.currentTimeMillis(); - registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); ++ registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@@ -311,9 -320,15 +318,16 @@@ return parentRepairSession; } - public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal) - public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal) ++ public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, timestamp, isGlobal)); + if (!registeredForEndpointChanges) + { + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + registeredForEndpointChanges = true; + } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis())); ++ ++ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal)); } public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) @@@ -433,12 -454,14 +453,14 @@@ private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); private final Collection<Range<Token>> ranges; public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); - private final long repairedAt; public final boolean isIncremental; public final boolean isGlobal; + public final long repairedAt; + public final InetAddress coordinator; - public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt) ++ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { + this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) { this.columnFamilyStores.put(cfs.metadata.cfId, cfs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 77d2d12,8b9ca08..1277209 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@@ -183,10 -196,10 +183,10 @@@ public class LeveledCompactionStrategyT assertTrue(strategy.getSSTableCountPerLevel()[2] > 0); Range<Token> range = new Range<>(Util.token(""), Util.token("")); - int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis()); + int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), true); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, true); - RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), true); + RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index eec29bc,892ced1..6aacae6 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@@ -33,10 -34,10 +33,11 @@@ import org.apache.cassandra.dht.IPartit import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; + import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; import static org.junit.Assert.assertEquals; @@@ -89,13 -90,12 +90,13 @@@ public class LocalSyncTaskTest extends Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), false); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, false); ++ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, System.currentTimeMillis(), false); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); + MerkleTrees tree1 = createInitialTree(desc); - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); + MerkleTrees tree2 = createInitialTree(desc); // change a range in one of the trees Token token = partirioner.midpoint(range.left, range.right); http://git-wip-us.apache.org/repos/asf/cassandra/blob/716f02e9/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index bd761db,7793660..db751cf --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@@ -223,10 -229,10 +223,10 @@@ public class ActiveRepairServiceTes public void testGetActiveRepairedSSTableRefs() { ColumnFamilyStore store = prepareColumnFamilyStore(); - Set<SSTableReader> original = store.getUnrepairedSSTables(); + Set<SSTableReader> original = store.getLiveSSTables(); UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, 0, false); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false); ++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, 0, false); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); //add all sstables to parent repair session