http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/DiskBoundaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 72b5e2a..acfe71a 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -19,8 +19,9 @@ package org.apache.cassandra.db; import java.util.ArrayList; -import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Splitter; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.StorageService; @@ -68,7 +71,7 @@ public class DiskBoundaryManager private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) { - Collection<Range<Token>> localRanges; + RangesAtEndpoint localRanges; long ringVersion; TokenMetadata tmd; @@ -87,7 +90,7 @@ public class DiskBoundaryManager // Reason we use use the future settled TMD is that if we decommission a node, we want to stream // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled - localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddressAndPort()); + localRanges = cfs.keyspace.getReplicationStrategy().getAddressReplicas(tmd.cloneAfterAllSettled(), FBUtilities.getBroadcastAddressAndPort()); } logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion); } @@ -106,9 +109,18 @@ public class DiskBoundaryManager if (localRanges == null || localRanges.isEmpty()) return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion); - List<Range<Token>> sortedLocalRanges = Range.sort(localRanges); + // note that Range.sort unwraps any wraparound ranges, so we need to sort them here + List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream() + .filter(Replica::isFull) + .map(Replica::range) + .collect(Collectors.toList())); + List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream() + .filter(Replica::isTransient) + .map(Replica::range) + .collect(Collectors.toList())); + + List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs); - List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs); return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion); } @@ -121,15 +133,26 @@ public class DiskBoundaryManager * * The final entry in the returned list will always be the partitioner maximum tokens upper key bound */ - private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) { assert partitioner.splitter().isPresent(); + Splitter splitter = partitioner.splitter().get(); boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; - List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges); + + List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size()); + for (Range<Token> r : fullRanges) + weightedRanges.add(new Splitter.WeightedRange(1.0, r)); + + for (Range<Token> r : transientRanges) + weightedRanges.add(new Splitter.WeightedRange(0.1, r)); + + weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left)); + + List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, weightedRanges, dontSplitRanges); // If we can't split by ranges, split evenly to ensure utilisation of all disks if (dontSplitRanges && boundaries.size() < dataDirectories.length) - boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false); + boundaries = splitter.splitOwnedRanges(dataDirectories.length, weightedRanges, false); List<PartitionPosition> diskBoundaries = new ArrayList<>(); for (int i = 0; i < boundaries.size() - 1; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index c162697..436b7ef 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -503,6 +503,7 @@ public class Memtable implements Comparable<Memtable> toFlush.size(), ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, + false, sstableMetadataCollector, new SerializationHeader(true, cfs.metadata(), columns, stats), txn); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 8386048..9660f65 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.IOException; import java.util.Iterator; import org.apache.cassandra.exceptions.WriteTimeoutException; @@ -38,7 +37,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> Tracing.trace("Payload application resulted in WriteTimeout, not replying"); } - public void doVerb(MessageIn<Mutation> message, int id) throws IOException + public void doVerb(MessageIn<Mutation> message, int id) { // Check if there were any forwarding headers in this message InetAddressAndPort from = (InetAddressAndPort)message.parameters.get(ParameterType.FORWARD_FROM); @@ -69,7 +68,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> } } - private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort from) throws IOException + private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort from) { // tell the recipients who to send their ack to MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(ParameterType.FORWARD_FROM, from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 2bfb434..7eab016 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -24,7 +24,6 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; @@ -61,6 +60,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR private PartitionRangeReadCommand(boolean isDigest, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -69,7 +69,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR DataRange dataRange, IndexMetadata index) { - super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); + super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index); this.dataRange = dataRange; } @@ -82,6 +82,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(false, 0, + false, metadata, nowInSec, columnFilter, @@ -103,6 +104,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(false, 0, + false, metadata, nowInSec, ColumnFilter.all(metadata), @@ -151,6 +153,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR // on the ring. return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -164,6 +167,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -177,6 +181,21 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(true, digestVersion(), + false, + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public PartitionRangeReadCommand copyAsTransientQuery() + { + return new PartitionRangeReadCommand(false, + 0, + true, metadata(), nowInSec(), columnFilter(), @@ -191,6 +210,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -205,6 +225,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR { return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -406,6 +427,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR int version, boolean isDigest, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -415,7 +437,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); - return new PartitionRangeReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); + return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 0262140..736e3a3 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -34,7 +34,6 @@ import org.apache.cassandra.db.transform.RTBoundCloser; import org.apache.cassandra.db.transform.RTBoundValidator; import org.apache.cassandra.db.transform.StoppingTransformation; import org.apache.cassandra.db.transform.Transformation; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; @@ -68,6 +67,7 @@ public abstract class ReadCommand extends AbstractReadQuery private final Kind kind; private final boolean isDigestQuery; + private final boolean acceptsTransient; // if a digest query, the version for which the digest is expected. Ignored if not a digest. private int digestVersion; @@ -80,6 +80,7 @@ public abstract class ReadCommand extends AbstractReadQuery int version, boolean isDigest, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -104,6 +105,7 @@ public abstract class ReadCommand extends AbstractReadQuery protected ReadCommand(Kind kind, boolean isDigestQuery, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -115,6 +117,7 @@ public abstract class ReadCommand extends AbstractReadQuery this.kind = kind; this.isDigestQuery = isDigestQuery; this.digestVersion = digestVersion; + this.acceptsTransient = acceptsTransient; this.index = index; } @@ -176,6 +179,14 @@ public abstract class ReadCommand extends AbstractReadQuery } /** + * @return Whether this query expects only a transient data response, or a full response + */ + public boolean acceptsTransient() + { + return acceptsTransient; + } + + /** * Index (metadata) chosen for this query. Can be null. * * @return index (metadata) chosen for this query @@ -210,6 +221,7 @@ public abstract class ReadCommand extends AbstractReadQuery * Returns a copy of this command with isDigestQuery set to true. */ public abstract ReadCommand copyAsDigestQuery(); + public abstract ReadCommand copyAsTransientQuery(); protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController); @@ -569,6 +581,16 @@ public abstract class ReadCommand extends AbstractReadQuery return (flags & 0x01) != 0; } + private static boolean acceptsTransient(int flags) + { + return (flags & 0x08) != 0; + } + + private static int acceptsTransientFlag(boolean acceptsTransient) + { + return acceptsTransient ? 0x08 : 0; + } + // We don't set this flag anymore, but still look if we receive a // command with it set in case someone is using thrift a mixed 3.0/4.0+ // cluster (which is unsupported). This is also a reminder for not @@ -592,7 +614,11 @@ public abstract class ReadCommand extends AbstractReadQuery public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { out.writeByte(command.kind.ordinal()); - out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata())); + out.writeByte( + digestFlag(command.isDigestQuery()) + | indexFlag(null != command.indexMetadata()) + | acceptsTransientFlag(command.acceptsTransient()) + ); if (command.isDigestQuery()) out.writeUnsignedVInt(command.digestVersion()); command.metadata().id.serialize(out); @@ -611,6 +637,7 @@ public abstract class ReadCommand extends AbstractReadQuery Kind kind = Kind.values()[in.readByte()]; int flags = in.readByte(); boolean isDigest = isDigest(flags); + boolean acceptsTransient = acceptsTransient(flags); // Shouldn't happen or it's a user error (see comment above) but // better complain loudly than doing the wrong thing. if (isForThrift(flags)) @@ -628,7 +655,7 @@ public abstract class ReadCommand extends AbstractReadQuery DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator); IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null; - return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index); } private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SSTableImporter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java index c919d25..7597f82 100644 --- a/src/java/org/apache/cassandra/db/SSTableImporter.java +++ b/src/java/org/apache/cassandra/db/SSTableImporter.java @@ -349,9 +349,9 @@ public class SSTableImporter } if (options.clearRepaired) { - descriptor.getMetadataSerializer().mutateRepaired(descriptor, - ActiveRepairService.UNREPAIRED_SSTABLE, - null); + descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, + null, + false); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 97ab210..c81185e 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -36,7 +36,6 @@ import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.Transformation; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; @@ -71,6 +70,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar @VisibleForTesting protected SinglePartitionReadCommand(boolean isDigest, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -80,7 +80,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar ClusteringIndexFilter clusteringIndexFilter, IndexMetadata index) { - super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index); assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; @@ -111,6 +111,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { return new SinglePartitionReadCommand(false, 0, + false, metadata, nowInSec, columnFilter, @@ -286,6 +287,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -300,6 +302,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { return new SinglePartitionReadCommand(true, digestVersion(), + acceptsTransient(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand copyAsTransientQuery() + { + return new SinglePartitionReadCommand(false, + 0, + true, metadata(), nowInSec(), columnFilter(), @@ -315,6 +333,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), + acceptsTransient(), metadata(), nowInSec(), columnFilter(), @@ -1064,6 +1083,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar int version, boolean isDigest, int digestVersion, + boolean acceptsTransient, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, @@ -1074,7 +1094,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar { DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, DatabaseDescriptor.getMaxValueSize())); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); - return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); + return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index fb9e889..ff070a3 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -29,13 +29,15 @@ import java.util.stream.StreamSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,8 @@ import static java.util.Collections.singletonMap; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; public final class SystemKeyspace { @@ -95,12 +99,10 @@ public final class SystemKeyspace public static final String LOCAL = "local"; public static final String PEERS_V2 = "peers_v2"; public static final String PEER_EVENTS_V2 = "peer_events_v2"; - public static final String RANGE_XFERS = "range_xfers"; public static final String COMPACTION_HISTORY = "compaction_history"; public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; - public static final String AVAILABLE_RANGES = "available_ranges"; - public static final String TRANSFERRED_RANGES = "transferred_ranges"; + public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2"; public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2"; public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress"; public static final String BUILT_VIEWS = "built_views"; @@ -110,6 +112,8 @@ public final class SystemKeyspace @Deprecated public static final String LEGACY_PEERS = "peers"; @Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events"; @Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges"; + @Deprecated public static final String LEGACY_AVAILABLE_RANGES = "available_ranges"; + public static final TableMetadata Batches = parse(BATCHES, @@ -207,15 +211,6 @@ public final class SystemKeyspace + "PRIMARY KEY ((peer), peer_port))") .build(); - private static final TableMetadata RangeXfers = - parse(RANGE_XFERS, - "ranges requested for transfer", - "CREATE TABLE %s (" - + "token_bytes blob," - + "requested_at timestamp," - + "PRIMARY KEY ((token_bytes)))") - .build(); - private static final TableMetadata CompactionHistory = parse(COMPACTION_HISTORY, "week-long compaction history", @@ -256,14 +251,15 @@ public final class SystemKeyspace + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") .build(); - private static final TableMetadata AvailableRanges = - parse(AVAILABLE_RANGES, - "available keyspace/ranges during bootstrap/replace that are ready to be served", - "CREATE TABLE %s (" - + "keyspace_name text," - + "ranges set<blob>," - + "PRIMARY KEY ((keyspace_name)))") - .build(); + private static final TableMetadata AvailableRangesV2 = + parse(AVAILABLE_RANGES_V2, + "available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text," + + "full_ranges set<blob>," + + "transient_ranges set<blob>," + + "PRIMARY KEY ((keyspace_name)))") + .build(); private static final TableMetadata TransferredRangesV2 = parse(TRANSFERRED_RANGES_V2, @@ -366,6 +362,16 @@ public final class SystemKeyspace + "PRIMARY KEY ((operation, keyspace_name), peer))") .build(); + @Deprecated + private static final TableMetadata LegacyAvailableRanges = + parse(LEGACY_AVAILABLE_RANGES, + "available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text," + + "ranges set<blob>," + + "PRIMARY KEY ((keyspace_name)))") + .build(); + private static TableMetadata.Builder parse(String table, String description, String cql) { return CreateTableStatement.parse(format(cql, table), SchemaConstants.SYSTEM_KEYSPACE_NAME) @@ -390,11 +396,11 @@ public final class SystemKeyspace LegacyPeers, PeerEventsV2, LegacyPeerEvents, - RangeXfers, CompactionHistory, SSTableActivity, SizeEstimates, - AvailableRanges, + AvailableRangesV2, + LegacyAvailableRanges, TransferredRangesV2, LegacyTransferredRanges, ViewBuildsInProgress, @@ -1270,36 +1276,38 @@ public final class SystemKeyspace executeInternal(cql, keyspace, table); } - public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges) + public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> completedTransientRanges) { - String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?"; - Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size()); - for (Range<Token> range : completedRanges) - { - rangesToUpdate.add(rangeToBytes(range)); - } - executeInternal(format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace); + String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, transient_ranges = transient_ranges + ? WHERE keyspace_name = ?"; + executeInternal(format(cql, AVAILABLE_RANGES_V2), + completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()), + completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()), + keyspace); } - public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) + public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner) { - Set<Range<Token>> result = new HashSet<>(); String query = "SELECT * FROM system.%s WHERE keyspace_name=?"; - UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES), keyspace); + UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace); + InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost(); + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); for (UntypedResultSet.Row row : rs) { - Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance); - for (ByteBuffer rawRange : rawRanges) - { - result.add(byteBufferToRange(rawRange, partitioner)); - } + Optional.ofNullable(row.getSet("full_ranges", BytesType.instance)) + .ifPresent(full_ranges -> full_ranges.stream() + .map(buf -> byteBufferToRange(buf, partitioner)) + .forEach(range -> builder.add(fullReplica(endpoint, range)))); + Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance)) + .ifPresent(transient_ranges -> transient_ranges.stream() + .map(buf -> byteBufferToRange(buf, partitioner)) + .forEach(range -> builder.add(transientReplica(endpoint, range)))); } - return ImmutableSet.copyOf(result); + return builder.build(); } public static void resetAvailableRanges() { - ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES); + ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES_V2); availableRanges.truncateBlocking(); } @@ -1405,7 +1413,13 @@ public final class SystemKeyspace return result.one().getString("release_version"); } - private static ByteBuffer rangeToBytes(Range<Token> range) + @VisibleForTesting + public static Set<Range<Token>> rawRangesToRangeSet(Set<ByteBuffer> rawRanges, IPartitioner partitioner) + { + return rawRanges.stream().map(buf -> byteBufferToRange(buf, partitioner)).collect(Collectors.toSet()); + } + + static ByteBuffer rangeToBytes(Range<Token> range) { try (DataOutputBuffer out = new DataOutputBuffer()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java index ea5ff59..e0a58ba 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java @@ -18,6 +18,11 @@ package org.apache.cassandra.db; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +50,9 @@ public class SystemKeyspaceMigrator40 static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2); static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES); static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2); + static final String legacyAvailableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_AVAILABLE_RANGES); + static final String availableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.AVAILABLE_RANGES_V2); + private static final Logger logger = LoggerFactory.getLogger(SystemKeyspaceMigrator40.class); @@ -55,6 +63,7 @@ public class SystemKeyspaceMigrator40 migratePeers(); migratePeerEvents(); migrateTransferredRanges(); + migrateAvailableRanges(); } private static void migratePeers() @@ -181,4 +190,40 @@ public class SystemKeyspaceMigrator40 logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyTransferredRangesName, transferredRangesName); } + static void migrateAvailableRanges() + { + ColumnFamilyStore newAvailableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.AVAILABLE_RANGES_V2); + + if (!newAvailableRanges.isEmpty()) + return; + + logger.info("{} table was empty, migrating legacy {} to {}", availableRangesName, legacyAvailableRangesName, availableRangesName); + + String query = String.format("SELECT * FROM %s", + legacyAvailableRangesName); + + String insert = String.format("INSERT INTO %s (" + + "keyspace_name, " + + "full_ranges, " + + "transient_ranges) " + + " values ( ?, ?, ? )", + availableRangesName); + + UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000); + int transferred = 0; + for (UntypedResultSet.Row row : rows) + { + logger.debug("Transferring row {}", transferred); + String keyspace = row.getString("keyspace_name"); + Set<ByteBuffer> ranges = Optional.ofNullable(row.getSet("ranges", BytesType.instance)).orElse(Collections.emptySet()); + QueryProcessor.executeInternal(insert, + keyspace, + ranges, + Collections.emptySet()); + transferred++; + } + + logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyAvailableRangesName, availableRangesName); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/TableCQLHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TableCQLHelper.java b/src/java/org/apache/cassandra/db/TableCQLHelper.java index 550a6d6..f97bebc 100644 --- a/src/java/org/apache/cassandra/db/TableCQLHelper.java +++ b/src/java/org/apache/cassandra/db/TableCQLHelper.java @@ -310,6 +310,7 @@ public class TableCQLHelper builder.append("\n\tAND max_index_interval = ").append(tableParams.maxIndexInterval); builder.append("\n\tAND memtable_flush_period_in_ms = ").append(tableParams.memtableFlushPeriodInMs); builder.append("\n\tAND speculative_retry = '").append(tableParams.speculativeRetry).append("'"); + builder.append("\n\tAND speculative_write_threshold = '").append(tableParams.speculativeWriteThreshold).append("'"); builder.append("\n\tAND comment = ").append(singleQuote(tableParams.comment)); builder.append("\n\tAND caching = ").append(toCQL(tableParams.caching.asMap())); builder.append("\n\tAND compaction = ").append(toCQL(tableParams.compaction.asMap())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 59bdce6..28ea90a 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -530,12 +530,13 @@ public abstract class AbstractCompactionStrategy long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, MetadataCollector meta, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { - return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, cfs.metadata, meta, header, indexes, txn); + return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, txn); } public boolean supportsEarlyOpen() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java index dc16261..24bea06 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java @@ -158,11 +158,11 @@ public abstract class AbstractStrategyHolder * groups they deal with. IOW, if one holder returns true for a given isRepaired/isPendingRepair combo, * none of the others should. */ - public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair); + public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient); public boolean managesSSTable(SSTableReader sstable) { - return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair()); + return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair(), sstable.isTransient()); } public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader sstable); @@ -193,6 +193,7 @@ public abstract class AbstractStrategyHolder long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, @@ -203,4 +204,6 @@ public abstract class AbstractStrategyHolder * if it's not held by this holder */ public abstract int getStrategyIndex(AbstractCompactionStrategy strategy); + + public abstract boolean containsSSTable(SSTableReader sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index a872fea..2a56650 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -23,7 +23,7 @@ import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.LongPredicate; +import java.util.function.Predicate; import java.util.stream.Collectors; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -34,6 +34,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.*; import com.google.common.util.concurrent.*; + +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +46,6 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.repair.ValidationPartitionIterator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -71,7 +73,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.TableMetrics; -import org.apache.cassandra.repair.Validator; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; @@ -81,6 +82,8 @@ import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Refs; import static java.util.Collections.singleton; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; /** * <p> @@ -509,7 +512,10 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.ABORTED; } // if local ranges is empty, it means no data should remain - final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName()); + final Set<Range<Token>> allRanges = replicas.ranges(); + final Set<Range<Token>> transientRanges = replicas.filter(Replica::isTransient).ranges(); + final Set<Range<Token>> fullRanges = replicas.filter(Replica::isFull).ranges(); final boolean hasIndexes = cfStore.indexManager.hasIndexes(); return parallelAllSSTableOperation(cfStore, new OneSSTableOperation() @@ -525,8 +531,8 @@ public class CompactionManager implements CompactionManagerMBean @Override public void execute(LifecycleTransaction txn) throws IOException { - CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds()); - doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes); + CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds()); + doCleanupOne(cfStore, txn, cleanupStrategy, replicas.ranges(), fullRanges, transientRanges, hasIndexes); } }, jobs, OperationType.CLEANUP); } @@ -574,9 +580,8 @@ public class CompactionManager implements CompactionManagerMBean logger.info("Partitioner does not support splitting"); return AllSSTableOpStatus.ABORTED; } - final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName()); - if (r.isEmpty()) + if (StorageService.instance.getLocalReplicas(cfs.keyspace.getName()).isEmpty()) { logger.info("Relocate cannot run before a node has joined the ring"); return AllSSTableOpStatus.ABORTED; @@ -643,7 +648,11 @@ public class CompactionManager implements CompactionManagerMBean /** * Splits the given token ranges of the given sstables into a pending repair silo */ - public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> sstables, LifecycleTransaction txn, UUID sessionId) + public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore cfs, + RangesAtEndpoint tokenRanges, + Refs<SSTableReader> sstables, + LifecycleTransaction txn, + UUID sessionId) { Runnable runnable = new WrappedRunnable() { @@ -651,7 +660,7 @@ public class CompactionManager implements CompactionManagerMBean { try (TableMetrics.TableTimer.Context ctx = cfs.metric.anticompactionTime.time()) { - performAnticompaction(cfs, ranges, sstables, txn, ActiveRepairService.UNREPAIRED_SSTABLE, sessionId, sessionId); + performAnticompaction(cfs, tokenRanges, sstables, txn, sessionId); } } }; @@ -673,48 +682,69 @@ public class CompactionManager implements CompactionManagerMBean } /** + * for sstables that are fully contained in the given ranges, just rewrite their metadata with + * the pending repair id and remove them from the transaction + */ + private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs, + Refs<SSTableReader> refs, + Iterator<SSTableReader> sstableIterator, + Collection<Range<Token>> ranges, + LifecycleTransaction txn, + UUID sessionID, + boolean isTransient) throws IOException + { + if (ranges.isEmpty()) + return; + + List<Range<Token>> normalizedRanges = Range.normalize(ranges); + + Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, sessionID); + + cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables)); + cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, UNREPAIRED_SSTABLE, sessionID, isTransient); + // since we're just re-writing the sstable metdata for the fully contained sstables, we don't want + // them obsoleted when the anti-compaction is complete. So they're removed from the transaction here + txn.cancel(fullyContainedSSTables); + refs.release(fullyContainedSSTables); + } + + /** * Make sure the {validatedForRepair} are marked for compaction before calling this. * * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)). * * @param cfs - * @param ranges Ranges that the repair was carried out on + * @param ranges token ranges to be repaired * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them. - * @param parentRepairSession parent repair session ID + * @param sessionID the repair session we're anti-compacting for * @throws InterruptedException * @throws IOException */ public void performAnticompaction(ColumnFamilyStore cfs, - Collection<Range<Token>> ranges, + RangesAtEndpoint ranges, Refs<SSTableReader> validatedForRepair, LifecycleTransaction txn, - long repairedAt, - UUID pendingRepair, - UUID parentRepairSession) throws InterruptedException, IOException + UUID sessionID) throws IOException { try { - ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews"); + Preconditions.checkArgument(!ranges.isEmpty(), "No ranges to anti-compact"); if (logger.isInfoEnabled()) - logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); + logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(sessionID), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); if (logger.isTraceEnabled()) - logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); - Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - - Iterator<SSTableReader> sstableIterator = sstables.iterator(); - List<Range<Token>> normalizedRanges = Range.normalize(ranges); + logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(sessionID), ranges); - Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, parentRepairSession); + Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); + validateSSTableBoundsForAnticompaction(sessionID, sstables, ranges); + mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), ranges.fullRanges(), txn, sessionID, false); + mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), ranges.transientRanges(), txn, sessionID, true); - cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables)); - cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt, pendingRepair); - txn.cancel(fullyContainedSSTables); - validatedForRepair.release(fullyContainedSSTables); assert txn.originals().equals(sstables); if (!sstables.isEmpty()) - doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair); + doAntiCompaction(cfs, ranges, txn, sessionID); txn.finish(); } finally @@ -723,7 +753,28 @@ public class CompactionManager implements CompactionManagerMBean txn.close(); } - logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(parentRepairSession)); + logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(sessionID)); + } + + static void validateSSTableBoundsForAnticompaction(UUID sessionID, + Collection<SSTableReader> sstables, + RangesAtEndpoint ranges) + { + List<Range<Token>> normalizedRanges = Range.normalize(ranges.ranges()); + for (SSTableReader sstable : sstables) + { + Bounds<Token> bounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); + + if (!Iterables.any(normalizedRanges, r -> (r.contains(bounds.left) && r.contains(bounds.right)) || r.intersects(bounds))) + { + // this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here + String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.", + PreviewKind.NONE.logPrefix(sessionID), sstable, bounds, normalizedRanges); + logger.error(message); + throw new IllegalStateException(message); + } + } + } @VisibleForTesting @@ -736,8 +787,6 @@ public class CompactionManager implements CompactionManagerMBean Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); - boolean shouldAnticompact = false; - for (Range<Token> r : normalizedRanges) { // ranges are normalized - no wrap around - if first and last are contained we know that all tokens are contained in the range @@ -746,23 +795,13 @@ public class CompactionManager implements CompactionManagerMBean logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r); fullyContainedSSTables.add(sstable); sstableIterator.remove(); - shouldAnticompact = true; break; } else if (r.intersects(sstableBounds)) { logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, r); - shouldAnticompact = true; } } - - if (!shouldAnticompact) - { - // this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here - String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, normalizedRanges); - logger.error(message); - throw new IllegalStateException(message); - } } return fullyContainedSSTables; } @@ -914,7 +953,10 @@ public class CompactionManager implements CompactionManagerMBean { ColumnFamilyStore cfs = entry.getKey(); Keyspace keyspace = cfs.keyspace; - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); + final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName()); + final Set<Range<Token>> allRanges = replicas.ranges(); + final Set<Range<Token>> transientRanges = replicas.filter(Replica::isTransient).ranges(); + final Set<Range<Token>> fullRanges = replicas.filter(Replica::isFull).ranges(); boolean hasIndexes = cfs.indexManager.hasIndexes(); SSTableReader sstable = lookupSSTable(cfs, entry.getValue()); @@ -924,10 +966,10 @@ public class CompactionManager implements CompactionManagerMBean } else { - CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, FBUtilities.nowInSeconds()); + CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, allRanges, transientRanges, sstable.isRepaired(), FBUtilities.nowInSeconds()); try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP)) { - doCleanupOne(cfs, txn, cleanupStrategy, ranges, hasIndexes); + doCleanupOne(cfs, txn, cleanupStrategy, allRanges, fullRanges, transientRanges, hasIndexes); } catch (IOException e) { @@ -1104,22 +1146,33 @@ public class CompactionManager implements CompactionManagerMBean * * @throws IOException */ - private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException + private void doCleanupOne(final ColumnFamilyStore cfs, + LifecycleTransaction txn, + CleanupStrategy cleanupStrategy, + Collection<Range<Token>> allRanges, + Collection<Range<Token>> fullRanges, + Collection<Range<Token>> transientRanges, + boolean hasIndexes) throws IOException { assert !cfs.isIndex(); SSTableReader sstable = txn.onlyOne(); // if ranges is empty and no index, entire sstable is discarded - if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)) + if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(allRanges)) { txn.obsoleteOriginals(); txn.finish(); return; } - if (!needsCleanup(sstable, ranges)) + + boolean needsCleanupFull = needsCleanup(sstable, fullRanges); + boolean needsCleanupTransient = needsCleanup(sstable, transientRanges); + //If there are no ranges for which the table needs cleanup either due to lack of intersection or lack + //of the table being repaired. + if (!needsCleanupFull && (!needsCleanupTransient || !sstable.isRepaired())) { - logger.trace("Skipping {} for cleanup; all rows should be kept", sstable); + logger.trace("Skipping {} for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}", sstable, needsCleanupFull, needsCleanupTransient, sstable.isRepaired()); return; } @@ -1150,7 +1203,7 @@ public class CompactionManager implements CompactionManagerMBean CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { StatsMetadata metadata = sstable.getSSTableMetadata(); - writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, txn)); + writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn)); long lastBytesScanned = 0; while (ci.hasNext()) @@ -1218,11 +1271,18 @@ public class CompactionManager implements CompactionManagerMBean this.nowInSec = nowInSec; } - public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) + public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec) { - return cfs.indexManager.hasIndexes() - ? new Full(cfs, ranges, nowInSec) - : new Bounded(cfs, ranges, nowInSec); + if (cfs.indexManager.hasIndexes()) + { + if (!transientRanges.isEmpty()) + { + //Shouldn't have been possible to create this situation + throw new AssertionError("Can't have indexes and transient ranges"); + } + return new Full(cfs, ranges, nowInSec); + } + return new Bounded(cfs, ranges, transientRanges, isRepaired, nowInSec); } public abstract ISSTableScanner getScanner(SSTableReader sstable); @@ -1230,7 +1290,10 @@ public class CompactionManager implements CompactionManagerMBean private static final class Bounded extends CleanupStrategy { - public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec) + private final Collection<Range<Token>> transientRanges; + private final boolean isRepaired; + + public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec) { super(ranges, nowInSec); cacheCleanupExecutor.submit(new Runnable() @@ -1241,12 +1304,23 @@ public class CompactionManager implements CompactionManagerMBean cfs.cleanupCache(); } }); + this.transientRanges = transientRanges; + this.isRepaired = isRepaired; } @Override public ISSTableScanner getScanner(SSTableReader sstable) { - return sstable.getScanner(ranges); + //If transient replication is enabled and there are transient ranges + //then cleanup should remove any partitions that are repaired and in the transient range + //as they should already be synchronized at other full replicas. + //So just don't scan the portion of the table containing the repaired transient ranges + Collection<Range<Token>> rangesToScan = ranges; + if (isRepaired) + { + rangesToScan = Collections2.filter(ranges, range -> !transientRanges.contains(range)); + } + return sstable.getScanner(rangesToScan); } @Override @@ -1291,6 +1365,7 @@ public class CompactionManager implements CompactionManagerMBean long expectedBloomFilterSize, long repairedAt, UUID pendingRepair, + boolean isTransient, SSTableReader sstable, LifecycleTransaction txn) { @@ -1301,6 +1376,7 @@ public class CompactionManager implements CompactionManagerMBean expectedBloomFilterSize, repairedAt, pendingRepair, + isTransient, sstable.getSSTableLevel(), sstable.header, cfs.indexManager.listIndexes(), @@ -1312,6 +1388,7 @@ public class CompactionManager implements CompactionManagerMBean int expectedBloomFilterSize, long repairedAt, UUID pendingRepair, + boolean isTransient, Collection<SSTableReader> sstables, LifecycleTransaction txn) { @@ -1335,6 +1412,7 @@ public class CompactionManager implements CompactionManagerMBean (long) expectedBloomFilterSize, repairedAt, pendingRepair, + isTransient, cfs.metadata, new MetadataCollector(sstables, cfs.metadata().comparator, minLevel), SerializationHeader.make(cfs.metadata(), sstables), @@ -1347,16 +1425,19 @@ public class CompactionManager implements CompactionManagerMBean * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted * and subsequently deleted. * @param cfs - * @param repaired a transaction over the repaired sstables to anticompacy - * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via - * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field. + * @param txn a transaction over the repaired sstables to anticompact + * @param ranges full and transient ranges to be placed into one of the new sstables. The repaired table will be tracked via + * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#pendingRepair} field. */ - private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt, UUID pendingRepair) + private void doAntiCompaction(ColumnFamilyStore cfs, + RangesAtEndpoint ranges, + LifecycleTransaction txn, + UUID pendingRepair) { - logger.info("Performing anticompaction on {} sstables", repaired.originals().size()); + logger.info("Performing anticompaction on {} sstables", txn.originals().size()); //Group SSTables - Set<SSTableReader> sstables = repaired.originals(); + Set<SSTableReader> sstables = txn.originals(); // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data. // Although anti-compaction could work on repaired sstables as well and would result in having more accurate @@ -1366,101 +1447,111 @@ public class CompactionManager implements CompactionManagerMBean cfs.metric.bytesAnticompacted.inc(SSTableReader.getTotalBytes(unrepairedSSTables)); Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables); - // iterate over sstables to check if the repaired / unrepaired ranges intersect them. + // iterate over sstables to check if the full / transient / unrepaired ranges intersect them. int antiCompactedSSTableCount = 0; for (Collection<SSTableReader> sstableGroup : groupedSSTables) { - try (LifecycleTransaction txn = repaired.split(sstableGroup)) + try (LifecycleTransaction groupTxn = txn.split(sstableGroup)) { - int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt, pendingRepair); + int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, pendingRepair); antiCompactedSSTableCount += antiCompacted; } } String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s)."; - logger.info(format, repaired.originals().size(), antiCompactedSSTableCount); + logger.info(format, txn.originals().size(), antiCompactedSSTableCount); } - private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, - LifecycleTransaction anticompactionGroup, long repairedAt, UUID pendingRepair) + private int antiCompactGroup(ColumnFamilyStore cfs, + RangesAtEndpoint ranges, + LifecycleTransaction txn, + UUID pendingRepair) { + Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full or transient range"); long groupMaxDataAge = -1; - for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();) + for (Iterator<SSTableReader> i = txn.originals().iterator(); i.hasNext();) { SSTableReader sstable = i.next(); if (groupMaxDataAge < sstable.maxDataAge) groupMaxDataAge = sstable.maxDataAge; } - if (anticompactionGroup.originals().size() == 0) + if (txn.originals().size() == 0) { logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available"); return 0; } - logger.info("Anticompacting {}", anticompactionGroup); - Set<SSTableReader> sstableAsSet = anticompactionGroup.originals(); + logger.info("Anticompacting {}", txn); + Set<SSTableReader> sstableAsSet = txn.originals(); File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION)); - long repairedKeyCount = 0; - long unrepairedKeyCount = 0; int nowInSec = FBUtilities.nowInSeconds(); CompactionStrategyManager strategy = cfs.getCompactionStrategyManager(); - try (SSTableRewriter repairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge); - SSTableRewriter unRepairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals()); + try (SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge); + SSTableRewriter transWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge); + SSTableRewriter unrepairedWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge); + + AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals()); CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); - repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, pendingRepair, sstableAsSet, anticompactionGroup)); - unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, null, sstableAsSet, anticompactionGroup)); - Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges); + fullWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, false, sstableAsSet, txn)); + transWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, true, sstableAsSet, txn)); + unrepairedWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, sstableAsSet, txn)); + + Predicate<Token> fullChecker = !ranges.fullRanges().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.fullRanges()) : t -> false; + Predicate<Token> transChecker = !ranges.transientRanges().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.transientRanges()) : t -> false; while (ci.hasNext()) { try (UnfilteredRowIterator partition = ci.next()) { - // if current range from sstable is repaired, save it into the new repaired sstable - if (containmentChecker.contains(partition.partitionKey().getToken())) + Token token = partition.partitionKey().getToken(); + // if this row is contained in the full or transient ranges, append it to the appropriate sstable + if (fullChecker.test(token)) { - repairedSSTableWriter.append(partition); - repairedKeyCount++; + fullWriter.append(partition); + } + else if (transChecker.test(token)) + { + transWriter.append(partition); } - // otherwise save into the new 'non-repaired' table else { - unRepairedSSTableWriter.append(partition); - unrepairedKeyCount++; + // otherwise, append it to the unrepaired sstable + unrepairedWriter.append(partition); } } } List<SSTableReader> anticompactedSSTables = new ArrayList<>(); - // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, + // since all writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method, // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted. - anticompactionGroup.permitRedundantTransitions(); - repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit(); - unRepairedSSTableWriter.prepareToCommit(); - anticompactedSSTables.addAll(repairedSSTableWriter.finished()); - anticompactedSSTables.addAll(unRepairedSSTableWriter.finished()); - repairedSSTableWriter.commit(); - unRepairedSSTableWriter.commit(); - - logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount, - repairedKeyCount + unrepairedKeyCount, - cfs.keyspace.getName(), - cfs.getTableName(), - anticompactionGroup); + txn.permitRedundantTransitions(); + + fullWriter.prepareToCommit(); + transWriter.prepareToCommit(); + unrepairedWriter.prepareToCommit(); + + anticompactedSSTables.addAll(fullWriter.finished()); + anticompactedSSTables.addAll(transWriter.finished()); + anticompactedSSTables.addAll(unrepairedWriter.finished()); + + fullWriter.commit(); + transWriter.commit(); + unrepairedWriter.commit(); + return anticompactedSSTables.size(); } catch (Throwable e) { JVMStabilityInspector.inspectThrowable(e); - logger.error("Error anticompacting " + anticompactionGroup, e); + logger.error("Error anticompacting " + txn, e); } return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java index 8fba121..8ce93fa 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SerializationHeader; @@ -71,11 +72,19 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder } @Override - public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair) + public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient) { - Preconditions.checkArgument(!isPendingRepair || !isRepaired, - "SSTables cannot be both repaired and pending repair"); - return !isPendingRepair && (isRepaired == this.isRepaired); + if (!isPendingRepair) + { + Preconditions.checkArgument(!isTransient, "isTransient can only be true for sstables pending repairs"); + return this.isRepaired == isRepaired; + } + else + { + Preconditions.checkArgument(!isRepaired, "SSTables cannot be both repaired and pending repair"); + return false; + + } } @Override @@ -206,7 +215,15 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder } @Override - public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) + public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, + long keyCount, + long repairedAt, + UUID pendingRepair, + boolean isTransient, + MetadataCollector collector, + SerializationHeader header, + Collection<Index> indexes, + LifecycleTransaction txn) { if (isRepaired) { @@ -226,6 +243,7 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder keyCount, repairedAt, pendingRepair, + isTransient, collector, header, indexes, @@ -237,4 +255,10 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder { return strategies.indexOf(strategy); } + + @Override + public boolean containsSSTable(SSTableReader sstable) + { + return Iterables.any(strategies, acs -> acs.getSSTables().contains(sstable)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org