Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f0cd3261 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f0cd3261 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f0cd3261 Branch: refs/heads/cassandra-3.0 Commit: f0cd3261be946fd9835e8c978841bc930d1e07d9 Parents: 063b376 3557d2e Author: Yuki Morishita <yu...@apache.org> Authored: Mon Apr 11 11:30:28 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Mon Apr 11 11:30:28 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairRunnable.java | 10 ++ .../cassandra/service/ActiveRepairService.java | 11 ++ .../cassandra/service/StorageService.java | 28 ++++- .../progress/jmx/LegacyJMXProgressSupport.java | 107 +++++++++++++++++ .../jmx/LegacyJMXProgressSupportTest.java | 118 +++++++++++++++++++ 6 files changed, 269 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 47e6105,e935e57..8c40e63 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,29 -1,6 +1,30 @@@ -2.2.6 +3.0.6 + * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470) + * Notify indexers of expired rows during compaction (CASSANDRA-11329) + * Properly respond with ProtocolError when a v1/v2 native protocol + header is received (CASSANDRA-11464) + * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) +Merged from 2.2: + * Make deprecated repair methods backward-compatible with previous notification service (CASSANDRA-11430) * IncomingStreamingConnection version check message wrong (CASSANDRA-11462) + +3.0.5 + * Fix rare NPE on schema upgrade from 2.x to 3.x (CASSANDRA-10943) + * Improve backoff policy for cqlsh COPY FROM (CASSANDRA-11320) + * Improve IF NOT EXISTS check in CREATE INDEX (CASSANDRA-11131) + * Upgrade ohc to 0.4.3 + * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093) + * Allocate merkletrees with the correct size (CASSANDRA-11390) + * Support streaming pre-3.0 sstables (CASSANDRA-10990) + * Add backpressure to compressed commit log (CASSANDRA-10971) + * SSTableExport supports secondary index tables (CASSANDRA-11330) + * Fix sstabledump to include missing info in debug output (CASSANDRA-11321) + * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) + * Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208) + * Remove recursive call from CompositesSearcher (CASSANDRA-11304) + * Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377) + * Fix sstableloader fail when using materialized view (CASSANDRA-11275) +Merged from 2.2: * DatabaseDescriptor should log stacktrace in case of Eception during seed provider creation (CASSANDRA-11312) * Use canonical path for directory in SSTable descriptor (CASSANDRA-10587) * Add cassandra-stress keystore option (CASSANDRA-9325) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index eb25457,d2b6ab6..354cb2a --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -230,8 -227,13 +230,13 @@@ public class RepairRunnable extends Wra { public void onSuccess(RepairSessionResult result) { + /** + * If the success message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ String message = String.format("Repair session %s for range %s finished", session.getId(), - session.getRange().toString()); + session.getRanges().toString()); logger.info(message); fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), @@@ -241,8 -243,13 +246,13 @@@ public void onFailure(Throwable t) { + /** + * If the failure message below is modified, it must also be updated on + * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport} + * for backward-compatibility support. + */ String message = String.format("Repair session %s for range %s failed with error %s", - session.getId(), session.getRange().toString(), t.getMessage()); + session.getId(), session.getRanges().toString(), t.getMessage()); logger.error(message, t); fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, progress.incrementAndGet(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 75573ac,dedc823..a1e5b6b --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -178,12 -171,8 +178,13 @@@ import org.apache.cassandra.utils.Wrapp import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport; + import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toList; +import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; +import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; + /** * This abstraction contains the token/identifier of this node * on the identifier space. This token gets gossiped around. @@@ -3036,16 -2965,17 +3046,17 @@@ public class StorageService extends Not { options.getRanges().addAll(getLocalRanges(keyspace)); } - if (columnFamilies != null) + if (tableNames != null) { - for (String columnFamily : columnFamilies) + for (String table : tableNames) { - options.getColumnFamilies().add(columnFamily); + options.getColumnFamilies().add(table); } } - return forceRepairAsync(keyspace, options); + return forceRepairAsync(keyspace, options, true); } + @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, @@@ -3058,9 -2988,10 +3069,10 @@@ { dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); } - return forceRepairAsync(keyspace, isSequential, dataCenters, null, primaryRange, fullRepair, columnFamilies); + return forceRepairAsync(keyspace, isSequential, dataCenters, null, primaryRange, fullRepair, tableNames); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@@ -3072,9 -3003,10 +3084,10 @@@ { return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), - dataCenters, hosts, fullRepair, columnFamilies); + dataCenters, hosts, fullRepair, tableNames); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@@ -3119,10 -3048,11 +3132,11 @@@ } logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); + repairingRange, keyspaceName, tableNames); - return forceRepairAsync(keyspaceName, options); + return forceRepairAsync(keyspaceName, options, true); } + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, @@@ -3177,12 -3107,7 +3191,12 @@@ return repairingRange; } + public TokenFactory getTokenFactory() + { + return tokenMetadata.partitioner.getTokenFactory(); + } + - public int forceRepairAsync(String keyspace, RepairOption options) + public int forceRepairAsync(String keyspace, RepairOption options, boolean legacy) { if (options.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) return 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java index 0000000,fae6f2a..275673e mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java +++ b/src/java/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupport.java @@@ -1,0 -1,108 +1,107 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.utils.progress.jmx; + ++import java.util.Optional; + import java.util.concurrent.atomic.AtomicLong; + import java.util.regex.Pattern; + import javax.management.Notification; + import javax.management.NotificationBroadcasterSupport; + import javax.management.ObjectName; + -import com.google.common.base.Optional; - + import org.apache.cassandra.utils.progress.ProgressEvent; + import org.apache.cassandra.utils.progress.ProgressListener; + + import static org.apache.cassandra.service.ActiveRepairService.Status; + + /** + * ProgressListener that translates ProgressEvent to legacy JMX Notification message (backward compatibility support) + */ + public class LegacyJMXProgressSupport implements ProgressListener + { + protected static final Pattern SESSION_FAILED_MATCHER = Pattern.compile("Repair session .* for range .* failed with error .*"); + protected static final Pattern SESSION_SUCCESS_MATCHER = Pattern.compile("Repair session .* for range .* finished"); + + private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final ObjectName jmxObjectName; + + private final NotificationBroadcasterSupport broadcaster; + + public LegacyJMXProgressSupport(NotificationBroadcasterSupport broadcaster, + ObjectName jmxObjectName) + { + this.broadcaster = broadcaster; + this.jmxObjectName = jmxObjectName; + } + + @Override + public void progress(String tag, ProgressEvent event) + { + if (tag.startsWith("repair:")) + { + Optional<int[]> legacyUserData = getLegacyUserdata(tag, event); + if (legacyUserData.isPresent()) + { + Notification jmxNotification = new Notification("repair", jmxObjectName, notificationSerialNumber.incrementAndGet(), event.getMessage()); + jmxNotification.setUserData(legacyUserData.get()); + broadcaster.sendNotification(jmxNotification); + } + } + } + + protected static Optional<int[]> getLegacyUserdata(String tag, ProgressEvent event) + { + Optional<Status> status = getStatus(event); + if (status.isPresent()) + { + int[] result = new int[2]; + result[0] = getCmd(tag); + result[1] = status.get().ordinal(); + return Optional.of(result); + } - return Optional.absent(); ++ return Optional.empty(); + } + + protected static Optional<Status> getStatus(ProgressEvent event) + { + switch (event.getType()) + { + case START: + return Optional.of(Status.STARTED); + case COMPLETE: + return Optional.of(Status.FINISHED); + case PROGRESS: + if (SESSION_FAILED_MATCHER.matcher(event.getMessage()).matches()) + { + return Optional.of(Status.SESSION_FAILED); + } + else if (SESSION_SUCCESS_MATCHER.matcher(event.getMessage()).matches()) + { + return Optional.of(Status.SESSION_SUCCESS); + } + } + - return Optional.absent(); ++ return Optional.empty(); + } + + protected static int getCmd(String tag) + { + return Integer.valueOf(tag.split(":")[1]); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f0cd3261/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java index 0000000,efa4a27..70fb5cc mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java +++ b/test/unit/org/apache/cassandra/utils/progress/jmx/LegacyJMXProgressSupportTest.java @@@ -1,0 -1,118 +1,118 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.utils.progress.jmx; + ++import java.util.Optional; + import java.util.UUID; + -import com.google.common.base.Optional; + import org.junit.Test; + + import org.apache.cassandra.dht.Murmur3Partitioner; + import org.apache.cassandra.dht.Range; + import org.apache.cassandra.dht.Token; + import org.apache.cassandra.service.ActiveRepairService; + import org.apache.cassandra.utils.progress.ProgressEvent; + import org.apache.cassandra.utils.progress.ProgressEventType; + + import static org.junit.Assert.*; + + + public class LegacyJMXProgressSupportTest + { + + @Test + public void testSessionSuccess() + { + int cmd = 321; + String message = String.format("Repair session %s for range %s finished", UUID.randomUUID(), + new Range<Token>(new Murmur3Partitioner.LongToken(3), new Murmur3Partitioner.LongToken(4))); + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message)); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal() }, result.get()); + } + + @Test + public void testSessionFailed() + { + int cmd = 321; + String message = String.format("Repair session %s for range %s failed with error %s", UUID.randomUUID(), + new Range<Token>(new Murmur3Partitioner.LongToken(3), new Murmur3Partitioner.LongToken(4)).toString(), + new RuntimeException("error")); + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, message)); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal() }, result.get()); + } + + @Test + public void testStarted() + { + int cmd = 321; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.START, + 0, 100, "bla")); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.STARTED.ordinal() }, result.get()); + } + + @Test + public void testFinished() + { + int cmd = 321; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.COMPLETE, + 2, 10, "bla")); + assertTrue(result.isPresent()); + assertArrayEquals(new int[]{ cmd, ActiveRepairService.Status.FINISHED.ordinal() }, result.get()); + } + + /* + States not mapped to the legacy notification + */ + @Test + public void testNone() + { + int cmd = 33; + Optional<int[]> result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.ERROR, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 33; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.SUCCESS, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 43; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.PROGRESS, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 1; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.ABORT, 2, 10, "bla")); + assertFalse(result.isPresent()); + + cmd = 9; + result = LegacyJMXProgressSupport.getLegacyUserdata(String.format("repair:%d", cmd), + new ProgressEvent(ProgressEventType.NOTIFICATION, 2, 10, "bla")); + assertFalse(result.isPresent()); + } + + }