http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 9766454..afe628b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -56,6 +56,7 @@ import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; @@ -112,6 +113,7 @@ public class CompactionStrategyManager implements INotificationConsumer /** * Variables guarded by read and write lock above */ + private final PendingRepairHolder transientRepairs; private final PendingRepairHolder pendingRepairs; private final CompactionStrategyHolder repaired; private final CompactionStrategyHolder unrepaired; @@ -156,10 +158,11 @@ public class CompactionStrategyManager implements INotificationConsumer return compactionStrategyIndexForDirectory(descriptor); } }; - pendingRepairs = new PendingRepairHolder(cfs, router); + transientRepairs = new PendingRepairHolder(cfs, router, true); + pendingRepairs = new PendingRepairHolder(cfs, router, false); repaired = new CompactionStrategyHolder(cfs, router, true); unrepaired = new CompactionStrategyHolder(cfs, router, false); - holders = ImmutableList.of(pendingRepairs, repaired, unrepaired); + holders = ImmutableList.of(transientRepairs, pendingRepairs, repaired, unrepaired); cfs.getTracker().subscribe(this); logger.trace("{} subscribed to the data tracker.", this); @@ -176,7 +179,6 @@ public class CompactionStrategyManager implements INotificationConsumer * Return the next background task * * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) - * */ public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { @@ -188,18 +190,16 @@ public class CompactionStrategyManager implements INotificationConsumer return null; int numPartitions = getNumTokenPartitions(); + // first try to promote/demote sstables from completed repairs - List<TaskSupplier> repairFinishedSuppliers = pendingRepairs.getRepairFinishedTaskSuppliers(); - if (!repairFinishedSuppliers.isEmpty()) - { - Collections.sort(repairFinishedSuppliers); - for (TaskSupplier supplier : repairFinishedSuppliers) - { - AbstractCompactionTask task = supplier.getTask(); - if (task != null) - return task; - } - } + AbstractCompactionTask repairFinishedTask; + repairFinishedTask = pendingRepairs.getNextRepairFinishedTask(); + if (repairFinishedTask != null) + return repairFinishedTask; + + repairFinishedTask = transientRepairs.getNextRepairFinishedTask(); + if (repairFinishedTask != null) + return repairFinishedTask; // sort compaction task suppliers by remaining tasks descending List<TaskSupplier> suppliers = new ArrayList<>(numPartitions * holders.size()); @@ -393,64 +393,28 @@ public class CompactionStrategyManager implements INotificationConsumer } } - - @VisibleForTesting - List<AbstractCompactionStrategy> getRepaired() + CompactionStrategyHolder getRepairedUnsafe() { - readLock.lock(); - try - { - return Lists.newArrayList(repaired.allStrategies()); - } - finally - { - readLock.unlock(); - } + return repaired; } @VisibleForTesting - List<AbstractCompactionStrategy> getUnrepaired() + CompactionStrategyHolder getUnrepairedUnsafe() { - readLock.lock(); - try - { - return Lists.newArrayList(unrepaired.allStrategies()); - } - finally - { - readLock.unlock(); - } + return unrepaired; } @VisibleForTesting - Iterable<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID) + PendingRepairHolder getPendingRepairsUnsafe() { - readLock.lock(); - try - { - return pendingRepairs.getStrategiesFor(sessionID); - } - finally - { - readLock.unlock(); - } + return pendingRepairs; } @VisibleForTesting - Set<UUID> pendingRepairs() + PendingRepairHolder getTransientRepairsUnsafe() { - readLock.lock(); - try - { - Set<UUID> ids = new HashSet<>(); - pendingRepairs.getManagers().forEach(p -> ids.addAll(p.getSessions())); - return ids; - } - finally - { - readLock.unlock(); - } + return transientRepairs; } public boolean hasDataForPendingRepair(UUID sessionID) @@ -458,8 +422,7 @@ public class CompactionStrategyManager implements INotificationConsumer readLock.lock(); try { - return Iterables.any(pendingRepairs.getManagers(), - prm -> prm.hasDataForSession(sessionID)); + return pendingRepairs.hasDataForSession(sessionID) || transientRepairs.hasDataForSession(sessionID); } finally { @@ -682,18 +645,19 @@ public class CompactionStrategyManager implements INotificationConsumer throw new IllegalStateException("No holder claimed " + sstable); } - private AbstractStrategyHolder getHolder(long repairedAt, UUID pendingRepair) + private AbstractStrategyHolder getHolder(long repairedAt, UUID pendingRepair, boolean isTransient) { return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE, - pendingRepair != ActiveRepairService.NO_PENDING_REPAIR); + pendingRepair != ActiveRepairService.NO_PENDING_REPAIR, + isTransient); } @VisibleForTesting - AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair) + AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair, boolean isTransient) { for (AbstractStrategyHolder holder : holders) { - if (holder.managesRepairedGroup(isRepaired, isPendingRepair)) + if (holder.managesRepairedGroup(isRepaired, isPendingRepair, isTransient)) return holder; } @@ -1146,16 +1110,26 @@ public class CompactionStrategyManager implements INotificationConsumer long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn) { + SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); maybeReloadDiskBoundaries(); readLock.lock(); try { - return getHolder(repairedAt, pendingRepair).createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, indexes, txn); + return getHolder(repairedAt, pendingRepair, isTransient).createSSTableMultiWriter(descriptor, + keyCount, + repairedAt, + pendingRepair, + isTransient, + collector, + header, + indexes, + txn); } finally { @@ -1220,7 +1194,7 @@ public class CompactionStrategyManager implements INotificationConsumer * Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races * with other processes between when the metadata is changed and when sstables are moved between strategies. */ - public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair) throws IOException + public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException { Set<SSTableReader> changed = new HashSet<>(); @@ -1229,7 +1203,7 @@ public class CompactionStrategyManager implements INotificationConsumer { for (SSTableReader sstable: sstables) { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient); sstable.reloadSSTableMetadata(); changed.add(sstable); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 662384c..591b7c4 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -29,20 +29,19 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; - -import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; -import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; @@ -339,6 +338,23 @@ public class CompactionTask extends AbstractCompactionTask return ids.iterator().next(); } + public static boolean getIsTransient(Set<SSTableReader> sstables) + { + if (sstables.isEmpty()) + { + return false; + } + + boolean isTransient = sstables.iterator().next().isTransient(); + + if (!Iterables.all(sstables, sstable -> sstable.isTransient() == isTransient)) + { + throw new RuntimeException("Attempting to compact transient sstables with non transient sstables"); + } + + return isTransient; + } + /* * Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java index 7b9123f..92e44a7 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -43,10 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService; public class PendingRepairHolder extends AbstractStrategyHolder { private final List<PendingRepairManager> managers = new ArrayList<>(); + private final boolean isTransient; - public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router) + public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router, boolean isTransient) { super(cfs, router); + this.isTransient = isTransient; } @Override @@ -66,15 +69,15 @@ public class PendingRepairHolder extends AbstractStrategyHolder { managers.clear(); for (int i = 0; i < numTokenPartitions; i++) - managers.add(new PendingRepairManager(cfs, params)); + managers.add(new PendingRepairManager(cfs, params, isTransient)); } @Override - public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair) + public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient) { Preconditions.checkArgument(!isPendingRepair || !isRepaired, "SSTables cannot be both repaired and pending repair"); - return isPendingRepair; + return isPendingRepair && (this.isTransient == isTransient); } @Override @@ -145,7 +148,23 @@ public class PendingRepairHolder extends AbstractStrategyHolder return tasks; } - public ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers() + AbstractCompactionTask getNextRepairFinishedTask() + { + List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers(); + if (!repairFinishedSuppliers.isEmpty()) + { + Collections.sort(repairFinishedSuppliers); + for (TaskSupplier supplier : repairFinishedSuppliers) + { + AbstractCompactionTask task = supplier.getTask(); + if (task != null) + return task; + } + } + return null; + } + + private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers() { ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size()); for (PendingRepairManager manager : managers) @@ -218,6 +237,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, @@ -233,6 +253,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder keyCount, repairedAt, pendingRepair, + isTransient, collector, header, indexes, @@ -249,4 +270,15 @@ public class PendingRepairHolder extends AbstractStrategyHolder } return -1; } + + public boolean hasDataForSession(UUID sessionID) + { + return Iterables.any(managers, prm -> prm.hasDataForSession(sessionID)); + } + + @Override + public boolean containsSSTable(SSTableReader sstable) + { + return Iterables.any(managers, prm -> prm.containsSSTable(sstable)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index edc9a2f..6763abf 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -30,7 +30,9 @@ import java.util.UUID; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -62,6 +64,7 @@ class PendingRepairManager private final ColumnFamilyStore cfs; private final CompactionParams params; + private final boolean isTransient; private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of(); /** @@ -75,10 +78,11 @@ class PendingRepairManager } } - PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params) + PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params, boolean isTransient) { this.cfs = cfs; this.params = params; + this.isTransient = isTransient; } private ImmutableMap.Builder<UUID, AbstractCompactionStrategy> mapBuilder() @@ -156,6 +160,7 @@ class PendingRepairManager synchronized void addSSTable(SSTableReader sstable) { + Preconditions.checkArgument(sstable.isTransient() == isTransient); getOrCreate(sstable).addSSTable(sstable); } @@ -389,6 +394,15 @@ class PendingRepairManager return strategies.keySet().contains(sessionID); } + boolean containsSSTable(SSTableReader sstable) + { + if (!sstable.isPendingRepair()) + return false; + + AbstractCompactionStrategy strategy = strategies.get(sstable.getPendingRepair()); + return strategy != null && strategy.getSSTables().contains(sstable); + } + public Collection<AbstractCompactionTask> createUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore) { Map<UUID, List<SSTableReader>> group = sstables.stream().collect(Collectors.groupingBy(s -> s.getSSTableMetadata().pendingRepair)); @@ -419,18 +433,35 @@ class PendingRepairManager protected void runMayThrow() throws Exception { boolean completed = false; + boolean obsoleteSSTables = isTransient && repairedAt > 0; try { - logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID); - cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR); + if (obsoleteSSTables) + { + logger.info("Obsoleting transient repaired ssatbles"); + Preconditions.checkState(Iterables.all(transaction.originals(), SSTableReader::isTransient)); + transaction.obsoleteOriginals(); + } + else + { + logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID); + cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false); + } completed = true; } finally { - // we always abort because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll - // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other - // compactions from marking these sstables compacting, and unmarking them when we're done - transaction.abort(); + if (obsoleteSSTables) + { + transaction.finish(); + } + else + { + // we abort here because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll + // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other + // compactions from marking these sstables compacting, and unmarking them when we're done + transaction.abort(); + } if (completed) { removeSession(sessionID); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index f97b693..aa41051 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -170,7 +170,7 @@ public class Scrubber implements Closeable } StatsMetadata metadata = sstable.getSSTableMetadata(); - writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, transaction)); + writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, transaction)); DecoratedKey prevKey = null; @@ -277,7 +277,7 @@ public class Scrubber implements Closeable // out of order rows, but no bad rows found - we can keep our repairedAt time long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : metadata.repairedAt; SSTableReader newInOrderSstable; - try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, metadata.pendingRepair, sstable, transaction)) + try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, transaction)) { for (Partition partition : outOfOrder) inOrderWriter.append(partition.unfilteredIterator()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index 80453ef..e1406aa 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -68,14 +68,15 @@ public class Upgrader this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } - private SSTableWriter createCompactionWriter(long repairedAt, UUID parentRepair) + private SSTableWriter createCompactionWriter(StatsMetadata metadata) { MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator()); sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel()); return SSTableWriter.create(cfs.newSSTableDescriptor(directory), estimatedRows, - repairedAt, - parentRepair, + metadata.repairedAt, + metadata.pendingRepair, + metadata.isTransient, cfs.metadata, sstableMetadataCollector, SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)), @@ -91,8 +92,7 @@ public class Upgrader AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals()); CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID())) { - StatsMetadata metadata = sstable.getSSTableMetadata(); - writer.switchWriter(createCompactionWriter(metadata.repairedAt, metadata.pendingRepair)); + writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata())); while (iter.hasNext()) writer.append(iter.next()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index db49369..446d527 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -350,6 +350,7 @@ public class Verifier implements Closeable public RangeOwnHelper(List<Range<Token>> normalizedRanges) { this.normalizedRanges = normalizedRanges; + Range.assertNormalized(normalizedRanges); } /** @@ -457,7 +458,7 @@ public class Verifier implements Closeable { try { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().pendingRepair); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient()); sstable.reloadSSTableMetadata(); cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java index 5ddd99c..d72b236 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java @@ -57,6 +57,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa protected final long maxAge; protected final long minRepairedAt; protected final UUID pendingRepair; + protected final boolean isTransient; protected final SSTableRewriter sstableWriter; protected final LifecycleTransaction txn; @@ -91,6 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge); minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables); pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables); + isTransient = CompactionTask.getIsTransient(nonExpiredSSTables); DiskBoundaries db = cfs.getDiskBoundaries(); diskBoundaries = db.positions; locations = db.directories; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index cda7e38..6180f96 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -72,6 +72,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter estimatedTotalKeys, minRepairedAt, pendingRepair, + isTransient, cfs.metadata, new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 3959b4b..2b93eb4 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -108,6 +108,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter keysPerSSTable, minRepairedAt, pendingRepair, + isTransient, cfs.metadata, new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel), SerializationHeader.make(cfs.metadata(), txn.originals()), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index c4f84e8..df7eeaf 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -111,6 +111,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys / estimatedSSTables, minRepairedAt, pendingRepair, + isTransient, cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata().comparator, level), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index a4af783..7533f1d 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -107,6 +107,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentPartitionsToWrite, minRepairedAt, pendingRepair, + isTransient, cfs.metadata, new MetadataCollector(allSSTables, cfs.metadata().comparator, 0), SerializationHeader.make(cfs.metadata(), nonExpiredSSTables), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index 9064b0f..bed0958 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -82,18 +82,6 @@ public abstract class PartitionIterators return new SingletonPartitionIterator(iterator); } - public static void consume(PartitionIterator iterator) - { - while (iterator.hasNext()) - { - try (RowIterator partition = iterator.next()) - { - while (partition.hasNext()) - partition.next(); - } - } - } - /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java index 5f2e5a0..fa2e653 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java @@ -26,8 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.repair.KeyspaceRepairManager; public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager @@ -40,9 +39,12 @@ public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager } @Override - public ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + public ListenableFuture prepareIncrementalRepair(UUID sessionID, + Collection<ColumnFamilyStore> tables, + RangesAtEndpoint tokenRanges, + ExecutorService executor) { - PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, ranges, executor); + PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, tokenRanges, executor); return pac.run(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java index 4e0f13d..a205c3c 100644 --- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java +++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java @@ -43,6 +43,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.apache.cassandra.utils.concurrent.Refs; /** @@ -126,17 +127,17 @@ public class PendingAntiCompaction static class AcquisitionCallback implements AsyncFunction<List<AcquireResult>, Object> { private final UUID parentRepairSession; - private final Collection<Range<Token>> ranges; + private final RangesAtEndpoint tokenRanges; - public AcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + public AcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint tokenRanges) { this.parentRepairSession = parentRepairSession; - this.ranges = ranges; + this.tokenRanges = tokenRanges; } ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result) { - return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, result.refs, result.txn, parentRepairSession); + return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, result.refs, result.txn, parentRepairSession); } public ListenableFuture apply(List<AcquireResult> results) throws Exception @@ -177,14 +178,17 @@ public class PendingAntiCompaction private final UUID prsId; private final Collection<ColumnFamilyStore> tables; - private final Collection<Range<Token>> ranges; + private final RangesAtEndpoint tokenRanges; private final ExecutorService executor; - public PendingAntiCompaction(UUID prsId, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + public PendingAntiCompaction(UUID prsId, + Collection<ColumnFamilyStore> tables, + RangesAtEndpoint tokenRanges, + ExecutorService executor) { this.prsId = prsId; this.tables = tables; - this.ranges = ranges; + this.tokenRanges = tokenRanges; this.executor = executor; } @@ -194,12 +198,12 @@ public class PendingAntiCompaction for (ColumnFamilyStore cfs : tables) { cfs.forceBlockingFlush(); - ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId)); + ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, tokenRanges.ranges(), prsId)); executor.submit(task); tasks.add(task); } ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks); - ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, ranges), MoreExecutors.directExecutor()); + ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor()); return compactionResult; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index 5252187..c688fdf 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -68,17 +68,18 @@ public class CassandraOutgoingFile implements OutgoingStream private final ComponentManifest manifest; private Boolean isFullyContained; - private final List<Range<Token>> ranges; + private final List<Range<Token>> normalizedRanges; public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, - List<SSTableReader.PartitionPositionBounds> sections, Collection<Range<Token>> ranges, + List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges, long estimatedKeys) { Preconditions.checkNotNull(ref.get()); + Range.assertNormalized(normalizedRanges); this.ref = ref; this.estimatedKeys = estimatedKeys; this.sections = sections; - this.ranges = ImmutableList.copyOf(ranges); + this.normalizedRanges = ImmutableList.copyOf(normalizedRanges); this.filename = ref.get().getFilename(); this.manifest = getComponentManifest(ref.get()); @@ -194,7 +195,7 @@ public class CassandraOutgoingFile implements OutgoingStream .getCompactionStrategyFor(ref.get()); if (compactionStrategy instanceof LeveledCompactionStrategy) - return contained(ranges, ref.get()); + return contained(normalizedRanges, ref.get()); return false; } @@ -251,6 +252,6 @@ public class CassandraOutgoingFile implements OutgoingStream @Override public String toString() { - return "CassandraOutgoingFile{" + ref.get().getFilename() + '}'; + return "CassandraOutgoingFile{" + filename + '}'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java index 43667d0..6c2631c 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java @@ -18,19 +18,10 @@ package org.apache.cassandra.db.streaming; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.UUID; - import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; @@ -39,6 +30,8 @@ import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.IncomingStream; import org.apache.cassandra.streaming.OutgoingStream; @@ -49,6 +42,14 @@ import org.apache.cassandra.streaming.TableStreamManager; import org.apache.cassandra.streaming.messages.StreamMessageHeader; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.Refs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.UUID; /** * Implements the streaming interface for the native cassandra storage engine. @@ -96,14 +97,14 @@ public class CassandraStreamManager implements TableStreamManager } @Override - public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind) + public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind) { Refs<SSTableReader> refs = new Refs<>(); try { - final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size()); - for (Range<Token> range : ranges) - keyRanges.add(Range.makeRowRange(range)); + final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(replicas.size()); + for (Replica replica : replicas) + keyRanges.add(Range.makeRowRange(replica.range())); refs.addAll(cfs.selectAndReference(view -> { Set<SSTableReader> sstables = Sets.newHashSet(); SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); @@ -141,11 +142,16 @@ public class CassandraStreamManager implements TableStreamManager }).refs); + List<Range<Token>> normalizedFullRanges = Range.normalize(replicas.filter(Replica::isFull).ranges()); + List<Range<Token>> normalizedAllRanges = Range.normalize(replicas.ranges()); + //Create outgoing file streams for ranges possibly skipping repaired ranges in sstables List<OutgoingStream> streams = new ArrayList<>(refs.size()); - for (SSTableReader sstable: refs) + for (SSTableReader sstable : refs) { - Ref<SSTableReader> ref = refs.get(sstable); + List<Range<Token>> ranges = sstable.isRepaired() ? normalizedFullRanges : normalizedAllRanges; List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges); + + Ref<SSTableReader> ref = refs.get(sstable); if (sections.isEmpty()) { ref.release(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index fccabfe..572c648 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -156,7 +156,7 @@ public class CassandraStreamReader implements IStreamReader Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); return writer; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java index d35457e..09490e8 100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -60,6 +60,11 @@ public class TableViews extends AbstractCollection<View> baseTableMetadata = Schema.instance.getTableMetadataRef(id); } + public boolean hasViews() + { + return !views.isEmpty(); + } + public int size() { return views.size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java index c727f63..6717297 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java @@ -43,6 +43,8 @@ import org.apache.cassandra.db.compaction.CompactionInterruptedException; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -135,14 +137,15 @@ class ViewBuilder } // Get the local ranges for which the view hasn't already been built nor it's building - Set<Range<Token>> newRanges = StorageService.instance.getLocalRanges(ksName) - .stream() - .map(r -> r.subtractAll(builtRanges)) - .flatMap(Set::stream) - .map(r -> r.subtractAll(pendingRanges.keySet())) - .flatMap(Set::stream) - .collect(Collectors.toSet()); - + RangesAtEndpoint replicatedRanges = StorageService.instance.getLocalReplicas(ksName); + Replicas.temporaryAssertFull(replicatedRanges); + Set<Range<Token>> newRanges = replicatedRanges.ranges() + .stream() + .map(r -> r.subtractAll(builtRanges)) + .flatMap(Set::stream) + .map(r -> r.subtractAll(pendingRanges.keySet())) + .flatMap(Set::stream) + .collect(Collectors.toSet()); // If there are no new nor pending ranges we should finish the build if (newRanges.isEmpty() && pendingRanges.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index 000477d..7e3ea1b 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -79,7 +79,7 @@ public class ViewManager { assert keyspace.getName().equals(update.metadata().keyspace); - if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1) + if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor().allReplicas == 1) continue; if (!forTable(update.metadata().id).updatedViews(update).isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index df16943..ad10d9d 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -18,16 +18,17 @@ package org.apache.cassandra.db.view; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; +import java.util.function.Predicate; +import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.utils.FBUtilities; public final class ViewUtils @@ -58,46 +59,51 @@ public final class ViewUtils * * @return Optional.empty() if this method is called using a base token which does not belong to this replica */ - public static Optional<InetAddressAndPort> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) + public static Optional<Replica> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); - List<InetAddressAndPort> baseEndpoints = new ArrayList<>(); - List<InetAddressAndPort> viewEndpoints = new ArrayList<>(); - for (InetAddressAndPort baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) - { - // An endpoint is local if we're not using Net - if (!(replicationStrategy instanceof NetworkTopologyStrategy) || - DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter)) - baseEndpoints.add(baseEndpoint); - } + EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken); + EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken); - for (InetAddressAndPort viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) - { - // If we are a base endpoint which is also a view replica, we use ourselves as our view replica - if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort())) - return Optional.of(viewEndpoint); + Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil(); + if (localReplica.isPresent()) + return localReplica; - // We have to remove any endpoint which is shared between the base and the view, as it will select itself - // and throw off the counts otherwise. - if (baseEndpoints.contains(viewEndpoint)) - baseEndpoints.remove(viewEndpoint); - else if (!(replicationStrategy instanceof NetworkTopologyStrategy) || - DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter)) - viewEndpoints.add(viewEndpoint); - } + // We only select replicas from our own DC + // TODO: this is poor encapsulation, leaking implementation details of replication strategy + Predicate<Replica> isLocalDC = r -> !(replicationStrategy instanceof NetworkTopologyStrategy) + || DatabaseDescriptor.getEndpointSnitch().getDatacenter(r).equals(localDataCenter); + + // We have to remove any endpoint which is shared between the base and the view, as it will select itself + // and throw off the counts otherwise. + EndpointsForToken baseReplicas = naturalBaseReplicas.filter( + r -> !naturalViewReplicas.endpoints().contains(r.endpoint()) && isLocalDC.test(r) + ); + EndpointsForToken viewReplicas = naturalViewReplicas.filter( + r -> !naturalBaseReplicas.endpoints().contains(r.endpoint()) && isLocalDC.test(r) + ); // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace. // Since the same replication strategy is used, the same placement should be used and we should get the same // number of replicas for all of the tokens in the ring. - assert baseEndpoints.size() == viewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view"; - int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort()); + assert baseReplicas.size() == viewReplicas.size() : "Replication strategy should have the same number of endpoints for the base and the view"; + + int baseIdx = -1; + for (int i=0; i<baseReplicas.size(); i++) + { + if (baseReplicas.get(i).isLocal()) + { + baseIdx = i; + break; + } + } if (baseIdx < 0) //This node is not a base replica of this key, so we return empty return Optional.empty(); - return Optional.of(viewEndpoints.get(baseIdx)); + return Optional.of(viewReplicas.get(baseIdx)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 974d08e..e03c5ec 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -19,6 +19,7 @@ package org.apache.cassandra.dht; import java.io.Serializable; import java.util.*; +import java.util.function.Predicate; import org.apache.commons.lang3.ObjectUtils; @@ -529,7 +530,7 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen /** * Helper class to check if a token is contained within a given collection of ranges */ - public static class OrderedRangeContainmentChecker + public static class OrderedRangeContainmentChecker implements Predicate<Token> { private final Iterator<Range<Token>> normalizedRangesIterator; private Token lastToken = null; @@ -550,7 +551,8 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen * @param t token to check, must be larger than or equal to the last token passed * @return true if the token is contained within the ranges given to the constructor. */ - public boolean contains(Token t) + @Override + public boolean test(Token t) { assert lastToken == null || lastToken.compareTo(t) <= 0; lastToken = t; @@ -567,4 +569,25 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen } } } + + public static <T extends RingPosition<T>> void assertNormalized(List<Range<T>> ranges) + { + Range<T> lastRange = null; + for (Range<T> range : ranges) + { + if (lastRange == null) + { + lastRange = range; + } + else if (lastRange.left.compareTo(range.left) >= 0 || lastRange.intersects(range)) + { + throw new AssertionError(String.format("Ranges aren't properly normalized. lastRange %s, range %s, compareTo %d, intersects %b, all ranges %s%n", + lastRange, + range, + lastRange.compareTo(range), + lastRange.intersects(range), + ranges)); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java index b90bc96..4b98b97 100644 --- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java +++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java @@ -19,25 +19,27 @@ package org.apache.cassandra.dht; import java.math.BigInteger; -import java.net.InetAddress; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; -import java.util.List; import java.util.Set; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.locator.EndpointsByRange; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.locator.Replica; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.Replicas; import org.psjava.algo.graph.flownetwork.FordFulkersonAlgorithm; import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithm; import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithmResult; @@ -73,20 +75,20 @@ public class RangeFetchMapCalculator { private static final Logger logger = LoggerFactory.getLogger(RangeFetchMapCalculator.class); private static final long TRIVIAL_RANGE_LIMIT = 1000; - private final Multimap<Range<Token>, InetAddressAndPort> rangesWithSources; - private final Collection<RangeStreamer.ISourceFilter> sourceFilters; + private final EndpointsByRange rangesWithSources; + private final Predicate<Replica> sourceFilters; private final String keyspace; //We need two Vertices to act as source and destination in the algorithm private final Vertex sourceVertex = OuterVertex.getSourceVertex(); private final Vertex destinationVertex = OuterVertex.getDestinationVertex(); private final Set<Range<Token>> trivialRanges; - public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, - Collection<RangeStreamer.ISourceFilter> sourceFilters, + public RangeFetchMapCalculator(EndpointsByRange rangesWithSources, + Collection<Predicate<Replica>> sourceFilters, String keyspace) { this.rangesWithSources = rangesWithSources; - this.sourceFilters = sourceFilters; + this.sourceFilters = Predicates.and(sourceFilters); this.keyspace = keyspace; this.trivialRanges = rangesWithSources.keySet() .stream() @@ -158,14 +160,15 @@ public class RangeFetchMapCalculator boolean localDCCheck = true; while (!added) { - List<InetAddressAndPort> srcs = new ArrayList<>(rangesWithSources.get(trivialRange)); // sort with the endpoint having the least number of streams first: - srcs.sort(Comparator.comparingInt(o -> optimisedMap.get(o).size())); - for (InetAddressAndPort src : srcs) + EndpointsForRange replicas = rangesWithSources.get(trivialRange) + .sorted(Comparator.comparingInt(o -> optimisedMap.get(o.endpoint()).size())); + Replicas.temporaryAssertFull(replicas); + for (Replica replica : replicas) { - if (passFilters(src, localDCCheck)) + if (passFilters(replica, localDCCheck)) { - fetchMap.put(src, trivialRange); + fetchMap.put(replica.endpoint(), trivialRange); added = true; break; } @@ -347,15 +350,16 @@ public class RangeFetchMapCalculator private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> capacityGraph, RangeVertex rangeVertex, boolean localDCCheck) { boolean sourceFound = false; - for (InetAddressAndPort endpoint : rangesWithSources.get(rangeVertex.getRange())) + Replicas.temporaryAssertFull(rangesWithSources.get(rangeVertex.getRange())); + for (Replica replica : rangesWithSources.get(rangeVertex.getRange())) { - if (passFilters(endpoint, localDCCheck)) + if (passFilters(replica, localDCCheck)) { sourceFound = true; // if we pass filters, it means that we don't filter away localhost and we can count it as a source: - if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) + if (replica.isLocal()) continue; // but don't add localhost to the graph to avoid streaming locally - final Vertex endpointVertex = new EndpointVertex(endpoint); + final Vertex endpointVertex = new EndpointVertex(replica.endpoint()); capacityGraph.insertVertex(rangeVertex); capacityGraph.insertVertex(endpointVertex); capacityGraph.addEdge(rangeVertex, endpointVertex, Integer.MAX_VALUE); @@ -364,26 +368,20 @@ public class RangeFetchMapCalculator return sourceFound; } - private boolean isInLocalDC(InetAddressAndPort endpoint) + private boolean isInLocalDC(Replica replica) { - return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint)); + return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica)); } /** * - * @param endpoint Endpoint to check + * @param replica Replica to check * @param localDCCheck Allow endpoints with local DC * @return True if filters pass this endpoint */ - private boolean passFilters(final InetAddressAndPort endpoint, boolean localDCCheck) + private boolean passFilters(final Replica replica, boolean localDCCheck) { - for (RangeStreamer.ISourceFilter filter : sourceFilters) - { - if (!filter.shouldInclude(endpoint)) - return false; - } - - return !localDCCheck || isInLocalDC(endpoint); + return sourceFilters.apply(replica) && (!localDCCheck || isInLocalDC(replica)); } private static abstract class Vertex --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org