Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d387f5e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d387f5e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d387f5e Branch: refs/heads/trunk Commit: 1d387f5e7f688150c09b7eb14a036d153017ec02 Parents: 1a2eb5e 684120d Author: Marcus Eriksson <[email protected]> Authored: Tue Apr 24 08:56:34 2018 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Tue Apr 24 08:56:34 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 80 +++++++------- .../db/compaction/AntiCompactionTest.java | 105 ++++++++++++++++++- .../org/apache/cassandra/schema/MockSchema.java | 16 ++- 4 files changed, 161 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 6976c7f,5450322..784fa2b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -261,10 -30,12 +261,11 @@@ Merged from 3.0 * Fully utilise specified compaction threads (CASSANDRA-14210) * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763) Merged from 2.2: + * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411) * Fix JSON queries with IN restrictions and ORDER BY clause (CASSANDRA-14286) - * Backport circleci yaml (CASSANDRA-14240) + * CQL fromJson(null) throws NullPointerException (CASSANDRA-13891) Merged from 2.1: * Check checksum before decompressing data (CASSANDRA-14284) - * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183) 3.11.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5672dfe,f0a4de5..831d8ca --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@@ -648,63 -645,67 +648,30 @@@ public class CompactionManager implemen Refs<SSTableReader> validatedForRepair, LifecycleTransaction txn, long repairedAt, + UUID pendingRepair, UUID parentRepairSession) throws InterruptedException, IOException { - logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables()); - logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges); - Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); - // we should only notify that repair status changed if it actually did: - Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>(); - Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>(); - for (SSTableReader sstable : sstables) - wasRepairedBefore.put(sstable, sstable.isRepaired()); - - Set<SSTableReader> nonAnticompacting = new HashSet<>(); - - Iterator<SSTableReader> sstableIterator = sstables.iterator(); try { - List<Range<Token>> normalizedRanges = Range.normalize(ranges); - - while (sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); + ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession); + Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews"); - Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); + logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size()); + logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges); + Set<SSTableReader> sstables = new HashSet<>(validatedForRepair); - Set<SSTableReader> nonAnticompacting = new HashSet<>(); - - boolean shouldAnticompact = false; + Iterator<SSTableReader> sstableIterator = sstables.iterator(); + List<Range<Token>> normalizedRanges = Range.normalize(ranges); - Set<SSTableReader> fullyContainedSSTables = new HashSet<>(); - - while (sstableIterator.hasNext()) - { - SSTableReader sstable = sstableIterator.next(); - Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken()); - for (Range<Token> r : normalizedRanges) - { - if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right)) - { - logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r); - sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt); - sstable.reloadSSTableMetadata(); - mutatedRepairStatuses.add(sstable); - if (!wasRepairedBefore.get(sstable)) - mutatedRepairStatusToNotify.add(sstable); - sstableIterator.remove(); - shouldAnticompact = true; - break; - } - else if (r.intersects(sstableBounds)) - { - logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, r); - shouldAnticompact = true; - } - } ++ Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, parentRepairSession); - boolean shouldAnticompact = false; - - for (Range<Token> r : normalizedRanges) - { - if (r.contains(sstableRange)) - { - logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r); - fullyContainedSSTables.add(sstable); - sstableIterator.remove(); - shouldAnticompact = true; - break; - } - else if (sstableRange.intersects(r)) - { - logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, r); - shouldAnticompact = true; - } - } - -- if (!shouldAnticompact) -- { - logger.info("{} SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableRange, normalizedRanges); - logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges); -- nonAnticompacting.add(sstable); -- sstableIterator.remove(); -- } -- } - cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify); - txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses)); - validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses)); + cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables)); + cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt, pendingRepair); - txn.cancel(Sets.union(nonAnticompacting, fullyContainedSSTables)); - validatedForRepair.release(Sets.union(nonAnticompacting, fullyContainedSSTables)); ++ txn.cancel(fullyContainedSSTables); ++ validatedForRepair.release(fullyContainedSSTables); assert txn.originals().equals(sstables); if (!sstables.isEmpty()) - doAntiCompaction(cfs, ranges, txn, repairedAt); + doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair); txn.finish(); } finally @@@ -713,9 -714,9 +680,50 @@@ txn.close(); } - logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession); + logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(parentRepairSession)); + } + ++ @VisibleForTesting ++ static Set<SSTableReader> findSSTablesToAnticompact(Iterator<SSTableReader> sstableIterator, List<Range<Token>> normalizedRanges, UUID parentRepairSession) ++ { ++ Set<SSTableReader> fullyContainedSSTables = new HashSet<>(); ++ while (sstableIterator.hasNext()) ++ { ++ SSTableReader sstable = sstableIterator.next(); ++ ++ Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken()); ++ ++ boolean shouldAnticompact = false; ++ ++ for (Range<Token> r : normalizedRanges) ++ { ++ // ranges are normalized - no wrap around - if first and last are contained we know that all tokens are contained in the range ++ if (r.contains(sstable.first.getToken()) && r.contains(sstable.last.getToken())) ++ { ++ logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r); ++ fullyContainedSSTables.add(sstable); ++ sstableIterator.remove(); ++ shouldAnticompact = true; ++ break; ++ } ++ else if (r.intersects(sstableBounds)) ++ { ++ logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, r); ++ shouldAnticompact = true; ++ } ++ } ++ ++ if (!shouldAnticompact) ++ { ++ // this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here ++ String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, normalizedRanges); ++ logger.error(message); ++ throw new IllegalStateException(message); ++ } ++ } ++ return fullyContainedSSTables; + } + public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput) { FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()), splitOutput)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 6e7e184,841a22e..bda05af --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@@ -19,24 -19,20 +19,30 @@@ package org.apache.cassandra.db.compact import java.io.File; import java.io.IOException; ++import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; ++import java.util.Collections; ++import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.RateLimiter; +import com.google.common.collect.Lists; ++import com.google.common.collect.Sets; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; -import org.apache.cassandra.config.CFMetaData; ++import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.schema.MockSchema; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.EncodingStats; @@@ -310,10 -274,9 +316,11 @@@ public class AntiCompactionTes ColumnFamilyStore store = prepareColumnFamilyStore(); Collection<SSTableReader> sstables = getUnrepairedSSTables(store); assertEquals(store.getLiveSSTables().size(), sstables.size()); - Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes())); ++ // the sstables start at "0".getBytes() = 48, we need to include that first token, with "/".getBytes() = 47 + Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("9999".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); - UUID parentRepairSession = UUID.randomUUID(); + UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair; + registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair); try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) @@@ -351,22 -302,21 +358,29 @@@ { generateSStable(store,Integer.toString(table)); } ++ int refCountBefore = Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(); Collection<SSTableReader> sstables = getUnrepairedSSTables(store); assertEquals(store.getLiveSSTables().size(), sstables.size()); Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes())); List<Range<Token>> ranges = Arrays.asList(range); UUID parentRepairSession = UUID.randomUUID(); - + registerParentRepairSession(parentRepairSession, ranges, UNREPAIRED_SSTABLE, null); - ++ boolean gotException = false; try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); Refs<SSTableReader> refs = Refs.ref(sstables)) { - CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession); + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession); + } ++ catch (IllegalStateException e) ++ { ++ gotException = true; + } ++ assertTrue(gotException); assertThat(store.getLiveSSTables().size(), is(10)); assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false)); ++ assertEquals(refCountBefore, Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount()); } private ColumnFamilyStore prepareColumnFamilyStore() @@@ -399,42 -349,5 +413,129 @@@ return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired())); } + /** + * If the parent repair session is missing, we should still clean up + */ + @Test + public void missingParentRepairSession() throws Exception + { + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF); + store.disableAutoCompaction(); + + for (int table = 0; table < 10; table++) + { + generateSStable(store,Integer.toString(table)); + } + Collection<SSTableReader> sstables = getUnrepairedSSTables(store); + assertEquals(10, sstables.size()); + + Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes())); + List<Range<Token>> ranges = Arrays.asList(range); + + UUID missingRepairSession = UUIDGen.getTimeUUID(); + try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION); + Refs<SSTableReader> refs = Refs.ref(sstables)) + { + Assert.assertFalse(refs.isEmpty()); + try + { + CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession); + Assert.fail("expected RuntimeException"); + } + catch (RuntimeException e) + { + // expected + } + Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); + Assert.assertTrue(refs.isEmpty()); + } + } ++ ++ @Test ++ public void testSSTablesToInclude() ++ { ++ ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); ++ List<SSTableReader> sstables = new ArrayList<>(); ++ sstables.add(MockSchema.sstable(1, 10, 100, cfs)); ++ sstables.add(MockSchema.sstable(2, 100, 200, cfs)); ++ ++ Range<Token> r = new Range<>(t(10), t(100)); // should include sstable 1 and 2 above, but none is fully contained (Range is (x, y]) + ++ Iterator<SSTableReader> sstableIterator = sstables.iterator(); ++ Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); ++ assertTrue(fullyContainedSSTables.isEmpty()); ++ assertEquals(2, sstables.size()); ++ } ++ ++ @Test ++ public void testSSTablesToInclude2() ++ { ++ ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); ++ List<SSTableReader> sstables = new ArrayList<>(); ++ SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs); ++ SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); ++ sstables.add(sstable1); ++ sstables.add(sstable2); ++ ++ Range<Token> r = new Range<>(t(9), t(100)); // sstable 1 is fully contained ++ ++ Iterator<SSTableReader> sstableIterator = sstables.iterator(); ++ Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); ++ assertEquals(Collections.singleton(sstable1), fullyContainedSSTables); ++ assertEquals(Collections.singletonList(sstable2), sstables); ++ } ++ ++ @Test(expected = IllegalStateException.class) ++ public void testSSTablesToNotInclude() ++ { ++ ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); ++ List<SSTableReader> sstables = new ArrayList<>(); ++ SSTableReader sstable1 = MockSchema.sstable(1, 0, 5, cfs); ++ sstables.add(sstable1); ++ ++ Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included ++ ++ Iterator<SSTableReader> sstableIterator = sstables.iterator(); ++ CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); ++ } ++ ++ @Test(expected = IllegalStateException.class) ++ public void testSSTablesToNotInclude2() ++ { ++ ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); ++ List<SSTableReader> sstables = new ArrayList<>(); ++ SSTableReader sstable1 = MockSchema.sstable(1, 10, 10, cfs); ++ SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); ++ sstables.add(sstable1); ++ sstables.add(sstable2); ++ ++ Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw ++ ++ Iterator<SSTableReader> sstableIterator = sstables.iterator(); ++ CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); ++ } ++ ++ @Test ++ public void testSSTablesToInclude4() ++ { ++ ColumnFamilyStore cfs = MockSchema.newCFS("anticomp"); ++ List<SSTableReader> sstables = new ArrayList<>(); ++ SSTableReader sstable1 = MockSchema.sstable(1, 10, 100, cfs); ++ SSTableReader sstable2 = MockSchema.sstable(2, 100, 200, cfs); ++ sstables.add(sstable1); ++ sstables.add(sstable2); ++ ++ Range<Token> r = new Range<>(t(9), t(200)); // sstable 2 is fully contained - last token is equal ++ ++ Iterator<SSTableReader> sstableIterator = sstables.iterator(); ++ Set<SSTableReader> fullyContainedSSTables = CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID()); ++ assertEquals(Sets.newHashSet(sstable1, sstable2), fullyContainedSSTables); ++ assertTrue(sstables.isEmpty()); ++ } ++ ++ private Token t(long t) ++ { ++ return new Murmur3Partitioner.LongToken(t); ++ } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d387f5e/test/unit/org/apache/cassandra/schema/MockSchema.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/schema/MockSchema.java index 99fff32,0000000..05de7ac mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/schema/MockSchema.java +++ b/test/unit/org/apache/cassandra/schema/MockSchema.java @@@ -1,187 -1,0 +1,197 @@@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.schema; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.schema.CachingParams; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.AlwaysPresentFilter; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class MockSchema +{ + static + { + Memory offsets = Memory.allocate(4); + offsets.setInt(0, 0); + indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1); + } + private static final AtomicInteger id = new AtomicInteger(); + public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1))); + + public static final IndexSummary indexSummary; + private static final FileHandle RANDOM_ACCESS_READER_FACTORY = new FileHandle.Builder(temp("mocksegmentedfile").getAbsolutePath()).complete(); + + public static Memtable memtable(ColumnFamilyStore cfs) + { + return new Memtable(cfs.metadata()); + } + + public static SSTableReader sstable(int generation, ColumnFamilyStore cfs) + { + return sstable(generation, false, cfs); + } + ++ public static SSTableReader sstable(int generation, long first, long last, ColumnFamilyStore cfs) ++ { ++ return sstable(generation, 0, false, first, last, cfs); ++ } ++ + public static SSTableReader sstable(int generation, boolean keepRef, ColumnFamilyStore cfs) + { + return sstable(generation, 0, keepRef, cfs); + } + + public static SSTableReader sstable(int generation, int size, ColumnFamilyStore cfs) + { + return sstable(generation, size, false, cfs); + } - + public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs) + { ++ return sstable(generation, size, keepRef, generation, generation, cfs); ++ } ++ ++ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs) ++ { + Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), + cfs.keyspace.getName(), + cfs.getTableName(), + generation, SSTableFormat.Type.BIG); + Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC); + for (Component component : components) + { + File file = new File(descriptor.filenameFor(component)); + try + { + file.createNewFile(); + } + catch (IOException e) + { + } + } + if (size > 0) + { + try + { + File file = new File(descriptor.filenameFor(Component.DATA)); + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) + { + raf.setLength(size); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList()); + StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator) + .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header) + .get(MetadataType.STATS); + SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, + RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(), + new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header); - reader.first = reader.last = readerBounds(generation); ++ reader.first = readerBounds(firstToken); ++ reader.last = readerBounds(lastToken); + if (!keepRef) + reader.selfRef().release(); + return reader; + } + + public static ColumnFamilyStore newCFS() + { + return newCFS(ks.getName()); + } + + public static ColumnFamilyStore newCFS(String ksname) + { + String cfname = "mockcf" + (id.incrementAndGet()); + TableMetadata metadata = newTableMetadata(ksname, cfname); + return new ColumnFamilyStore(ks, cfname, 0, new TableMetadataRef(metadata), new Directories(metadata), false, false, false); + } + + public static TableMetadata newTableMetadata(String ksname, String cfname) + { + return TableMetadata.builder(ksname, cfname) + .partitioner(Murmur3Partitioner.instance) + .addPartitionKeyColumn("key", UTF8Type.instance) + .addClusteringColumn("col", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .caching(CachingParams.CACHE_NOTHING) + .build(); + } + - public static BufferDecoratedKey readerBounds(int generation) ++ public static BufferDecoratedKey readerBounds(long generation) + { + return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + + private static File temp(String id) + { + try + { + File file = File.createTempFile(id, "tmp"); + file.deleteOnExit(); + return file; + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public static void cleanup() + { + // clean up data directory which are stored as data directory/keyspace/data files + for (String dirName : DatabaseDescriptor.getAllDataFileLocations()) + { + File dir = new File(dirName); + if (!dir.exists()) + continue; + String[] children = dir.list(); + for (String child : children) + FileUtils.deleteRecursive(new File(dir, child)); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
