Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6404e015 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6404e015 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6404e015 Branch: refs/heads/trunk Commit: 6404e015ffe916fe24bacb3772b388e633af1261 Parents: c799a98 f43efaa Author: Aleksey Yeschenko <[email protected]> Authored: Fri May 1 19:50:06 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Fri May 1 19:50:06 2015 +0300 ---------------------------------------------------------------------- .../db/commitlog/CommitLogArchiver.java | 2 +- .../db/commitlog/CommitLogReplayer.java | 11 +- .../db/commitlog/CommitLogStressTest.java | 3 +- .../db/RecoveryManagerTruncateTest.java | 181 +++++++++++++++---- 4 files changed, 149 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index f6d1cc4,57f4b90..23ee9e3 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -271,17 -264,22 +271,17 @@@ public class CommitLogReplaye public void recover(File file) throws IOException { - logger.info("Replaying {}", file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); - final long segmentId = desc.id; - logger.info("Replaying {} (CL version {}, messaging version {})", - file.getPath(), - desc.version, - desc.getMessagingVersion()); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); - try { - assert reader.length() <= Integer.MAX_VALUE; - int offset = getStartOffset(segmentId, desc.version); - if (offset < 0) + if (desc.version < CommitLogDescriptor.VERSION_21) { - logger.debug("skipping replay of fully-flushed {}", file); + if (logAndCheckIfShouldSkip(file, desc)) + return; + if (globalPosition.segment == desc.id) + reader.seek(globalPosition.position); - replaySyncSection(reader, -1, desc, replayFilter); ++ replaySyncSection(reader, -1, desc); return; } @@@ -365,208 -411,71 +365,207 @@@ continue; } - if (!replaySyncSection(sectionReader, replayEnd, desc, replayFilter)) - if (logger.isDebugEnabled()) - logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); ++ if (!replaySyncSection(sectionReader, replayEnd, desc)) + break; + } + } + finally + { + FileUtils.closeQuietly(reader); + logger.info("Finished reading {}", file); + } + } - final long entryLocation = reader.getFilePointer(); - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() throws IOException - { - if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) - return; - if (pointInTimeExceeded(mutation)) - return; - - final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - - // Rebuild the mutation, omitting column families that - // a) the user has requested that we ignore, - // b) have already been flushed, - // or c) are part of a cf that was dropped. - // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. - Mutation newMutation = null; - for (ColumnFamily columnFamily : replayFilter.filter(mutation)) - { - if (Schema.instance.getCF(columnFamily.id()) == null) - continue; // dropped - - ReplayPosition rp = cfPositions.get(columnFamily.id()); - - // replay if current segment is newer than last flushed one or, - // if it is the last known segment, if we are after the replay position - if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) - { - if (newMutation == null) - newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - newMutation.add(columnFamily); - replayedCount.incrementAndGet(); - } - } - if (newMutation != null) - { - assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); - keyspacesRecovered.add(keyspace); - } - } - }; - futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); - if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) - { - FBUtilities.waitOnFutures(futures); - futures.clear(); - } + public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc) + { + logger.info("Replaying {} (CL version {}, messaging version {}, compression {})", + file.getPath(), + desc.version, + desc.getMessagingVersion(), + desc.compression); + + if (globalPosition.segment > desc.id) + { + logger.debug("skipping replay of fully-flushed {}", file); + return true; + } + return false; + } + + /** + * Replays a sync section containing a list of mutations. + * + * @return Whether replay should continue with the next section. + */ - private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, - final ReplayFilter replayFilter) throws IOException, FileNotFoundException ++ private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException, FileNotFoundException + { + /* read the logs populate Mutation and apply */ + while (reader.getFilePointer() < end && !reader.isEOF()) + { + if (logger.isDebugEnabled()) + logger.trace("Reading mutation at {}", reader.getFilePointer()); + + long claimedCRC32; + int serializedSize; + try + { + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) + { + logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); + return false; } + // Mutation must be at LEAST 10 bytes: + // 3 each for a non-empty Keyspace and Key (including the + // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + return false; + + long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) - break; + claimedSizeChecksum = reader.readLong(); + else + claimedSizeChecksum = reader.readInt() & 0xffffffffL; + checksum.reset(); + if (desc.version < CommitLogDescriptor.VERSION_20) + checksum.update(serializedSize); + else + checksum.updateInt(serializedSize); + + if (checksum.getValue() != claimedSizeChecksum) + return false; + // ok. - offset = end + CommitLogSegment.SYNC_MARKER_SIZE; - prevEnd = end; + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + if (desc.version < CommitLogDescriptor.VERSION_21) + claimedCRC32 = reader.readLong(); + else + claimedCRC32 = reader.readInt() & 0xffffffffL; + } + catch (EOFException eof) + { + return false; // last CL entry didn't get completely written. that's ok. } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } - replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter); ++ replayMutation(buffer, serializedSize, reader.getFilePointer(), desc); } - finally + return true; + } + + /** + * Deserializes and replays a commit log entry. + */ + void replayMutation(byte[] inputBuffer, int size, - final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException, ++ final long entryLocation, final CommitLogDescriptor desc) throws IOException, + FileNotFoundException + { + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + final Mutation mutation; + try { - FileUtils.closeQuietly(reader); - logger.info("Finished reading {}", file); + mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); + // doublecheck that what we read is [still] valid for the current schema + for (ColumnFamily cf : mutation.getColumnFamilies()) + for (Cell cell : cf) + cf.getComparator().validate(cell.name()); + } + catch (UnknownColumnFamilyException ex) + { + if (ex.cfId == null) + return; + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + return; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + File f = File.createTempFile("mutation", "dat"); + DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); + try + { + out.write(inputBuffer, 0, size); + } + finally + { + out.close(); + } + String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", + f.getAbsolutePath()); + logger.error(st, t); + return; + } + + if (logger.isDebugEnabled()) + logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); + + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (ColumnFamily columnFamily : replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(columnFamily.id()) == null) + continue; // dropped + + ReplayPosition rp = cfPositions.get(columnFamily.id()); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(columnFamily); + replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); + keyspacesRecovered.add(keyspace); + } + } + }; + futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 644e2c2,0000000..a8cf8fd mode 100644,000000..100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@@ -1,412 -1,0 +1,411 @@@ +package org.apache.cassandra.db.commitlog; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.Assert; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.RateLimiter; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.util.FastByteArrayInputStream; + +public class CommitLogStressTest +{ + + public static ByteBuffer dataSource; + + public static int NUM_THREADS = 4 * Runtime.getRuntime().availableProcessors() - 1; + + public static int numCells = 1; + + public static int cellSize = 1024; + + public static int rateLimit = 0; + + public static int runTimeMs = 10000; + + public static String location = DatabaseDescriptor.getCommitLogLocation() + "/stress"; + + public static int hash(int hash, ByteBuffer bytes) + { + int shift = 0; + for (int i=0; i<bytes.limit(); i++) { + hash += (bytes.get(i) & 0xFF) << shift; + shift = (shift + 8) & 0x1F; + } + return hash; + } + + public static void main(String[] args) throws Exception { + try { + if (args.length >= 1) { + NUM_THREADS = Integer.parseInt(args[0]); + System.out.println("Setting num threads to: " + NUM_THREADS); + } + + if (args.length >= 2) { + numCells = Integer.parseInt(args[1]); + System.out.println("Setting num cells to: " + numCells); + } + + if (args.length >= 3) { + cellSize = Integer.parseInt(args[1]); + System.out.println("Setting cell size to: " + cellSize + " be aware the source corpus may be small"); + } + + if (args.length >= 4) { + rateLimit = Integer.parseInt(args[1]); + System.out.println("Setting per thread rate limit to: " + rateLimit); + } + initialize(); + + CommitLogStressTest tester = new CommitLogStressTest(); + tester.testFixedSize(); + } + catch (Exception e) + { + e.printStackTrace(System.err); + } + finally { + System.exit(0); + } + } + + boolean failed = false; + volatile boolean stop = false; + boolean randomSize = false; + boolean discardedRun = false; + ReplayPosition discardedPos; + + @BeforeClass + static public void initialize() throws FileNotFoundException, IOException, InterruptedException + { + try (FileInputStream fis = new FileInputStream("CHANGES.txt")) + { + dataSource = ByteBuffer.allocateDirect((int)fis.getChannel().size()); + while (dataSource.hasRemaining()) { + fis.getChannel().read(dataSource); + } + dataSource.flip(); + } + + SchemaLoader.loadSchema(); + SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour + + File dir = new File(location); + if (dir.isDirectory()) + { + File[] files = dir.listFiles(); + + for (File f : files) + if (!f.delete()) + Assert.fail("Failed to delete " + f); + } else { + dir.mkdir(); + } + } + + @Test + public void testRandomSize() throws Exception + { + randomSize = false; + discardedRun = false; + testAllLogConfigs(); + } + + @Test + public void testFixedSize() throws Exception + { + randomSize = false; + discardedRun = false; + + testAllLogConfigs(); + } + + @Test + public void testDiscardedRun() throws Exception + { + discardedRun = true; + randomSize = true; + + testAllLogConfigs(); + } + + public void testAllLogConfigs() throws IOException, InterruptedException + { + failed = false; + DatabaseDescriptor.setCommitLogSyncBatchWindow(1); + DatabaseDescriptor.setCommitLogSyncPeriod(30); + DatabaseDescriptor.setCommitLogSegmentSize(32); + for (ParameterizedClass compressor : new ParameterizedClass[] { + null, + new ParameterizedClass("LZ4Compressor", null), + new ParameterizedClass("SnappyCompressor", null), + new ParameterizedClass("DeflateCompressor", null)}) { + DatabaseDescriptor.setCommitLogCompression(compressor); + for (CommitLogSync sync : CommitLogSync.values()) + { + DatabaseDescriptor.setCommitLogSync(sync); + CommitLog commitLog = new CommitLog(location, CommitLog.instance.archiver); + testLog(commitLog); + } + } + assert !failed; + } + + public void testLog(CommitLog commitLog) throws IOException, InterruptedException { + System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n", + mb(DatabaseDescriptor.getCommitLogSegmentSize()), + commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none", + commitLog.executor.getClass().getSimpleName(), + randomSize ? " random size" : "", + discardedRun ? " with discarded run" : ""); + commitLog.allocator.enableReserveSegmentCreation(); + + final List<CommitlogExecutor> threads = new ArrayList<>(); + ScheduledExecutorService scheduled = startThreads(commitLog, threads); + + discardedPos = ReplayPosition.NONE; + if (discardedRun) { + // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations. + Thread.sleep(runTimeMs / 3); + stop = true; + scheduled.shutdown(); + scheduled.awaitTermination(2, TimeUnit.SECONDS); + + for (CommitlogExecutor t: threads) + { + t.join(); + CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, t.rp); + if (t.rp.compareTo(discardedPos) > 0) + discardedPos = t.rp; + } + threads.clear(); + System.out.format("Discarded at %s\n", discardedPos); + + scheduled = startThreads(commitLog, threads); + } + + + Thread.sleep(runTimeMs); + stop = true; + scheduled.shutdown(); + scheduled.awaitTermination(2, TimeUnit.SECONDS); + + int hash = 0; + int cells = 0; + for (CommitlogExecutor t: threads) { + t.join(); + hash += t.hash; + cells += t.cells; + } + + commitLog.shutdownBlocking(); + + System.out.print("Stopped. Replaying... "); System.out.flush(); + Replayer repl = new Replayer(); + File[] files = new File(location).listFiles(); + repl.recover(files); + + for (File f : files) + if (!f.delete()) + Assert.fail("Failed to delete " + f); + + if (hash == repl.hash && cells == repl.cells) + System.out.println("Test success."); + else + { + System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n", repl.cells, cells, repl.hash, hash); + failed = true; + } + } + + public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor> threads) + { + stop = false; + for (int ii = 0; ii < NUM_THREADS; ii++) { + final CommitlogExecutor t = new CommitlogExecutor(commitLog); + threads.add(t); + t.start(); + } + + final long start = System.currentTimeMillis(); + Runnable printRunnable = new Runnable() { + long lastUpdate = 0; + + public void run() { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = mb(runtime.maxMemory()); + long allocatedMemory = mb(runtime.totalMemory()); + long freeMemory = mb(runtime.freeMemory()); + long temp = 0; + long sz = 0; + for (CommitlogExecutor cle : threads) { + temp += cle.counter.get(); + sz += cle.dataSize; + } + double time = (System.currentTimeMillis() - start) / 1000.0; + double avg = (temp / time); + System.out.println(String.format("second %d mem max %dmb allocated %dmb free %dmb mutations %d since start %d avg %.3f transfer %.3fmb", + ((System.currentTimeMillis() - start) / 1000), + maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate, avg, mb(sz / time))); + lastUpdate = temp; + } + }; + ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); + scheduled.scheduleAtFixedRate(printRunnable, 1, 1, TimeUnit.SECONDS); + return scheduled; + } + + private static long mb(long maxMemory) { + return maxMemory / (1024 * 1024); + } + + private static double mb(double maxMemory) { + return maxMemory / (1024 * 1024); + } + + public static ByteBuffer randomBytes(int quantity, ThreadLocalRandom tlr) { + ByteBuffer slice = ByteBuffer.allocate(quantity); + ByteBuffer source = dataSource.duplicate(); + source.position(tlr.nextInt(source.capacity() - quantity)); + source.limit(source.position() + quantity); + slice.put(source); + slice.flip(); + return slice; + } + + public class CommitlogExecutor extends Thread { + final AtomicLong counter = new AtomicLong(); + int hash = 0; + int cells = 0; + int dataSize = 0; + final CommitLog commitLog; + + volatile ReplayPosition rp; + + public CommitlogExecutor(CommitLog commitLog) + { + this.commitLog = commitLog; + } + + public void run() { + RateLimiter rl = rateLimit != 0 ? RateLimiter.create(rateLimit) : null; + final ThreadLocalRandom tlr = ThreadLocalRandom.current(); + while (!stop) { + if (rl != null) + rl.acquire(); + String ks = "Keyspace1"; + ByteBuffer key = randomBytes(16, tlr); + Mutation mutation = new Mutation(ks, key); + + for (int ii = 0; ii < numCells; ii++) { + int sz = randomSize ? tlr.nextInt(cellSize) : cellSize; + ByteBuffer bytes = randomBytes(sz, tlr); + mutation.add("Standard1", Util.cellname("name" + ii), bytes, + System.currentTimeMillis()); + hash = hash(hash, bytes); + ++cells; + dataSize += sz; + } + rp = commitLog.add(mutation); + counter.incrementAndGet(); + } + } + } + + class Replayer extends CommitLogReplayer + { + Replayer() + { + super(discardedPos, null, ReplayFilter.create()); + } + + int hash = 0; + int cells = 0; + + @Override - void replayMutation(byte[] inputBuffer, int size, - final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) ++ void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) + { + if (desc.id < discardedPos.segment) { + System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); + return; + } else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) + // Skip over this mutation. + return; + + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size); + Mutation mutation; + try + { + mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); + } + catch (IOException e) + { + // Test fails. + throw new AssertionError(e); + } + + for (ColumnFamily cf : mutation.getColumnFamilies()) { + for (Cell c : cf.getSortedColumns()) { + if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name")) + { + hash = hash(hash, c.value()); + ++cells; + } + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6404e015/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java index cef5914,817b8e9..a004105 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java @@@ -19,78 -19,165 +19,181 @@@ package org.apache.cassandra.db; import static org.apache.cassandra.Util.column; - import static org.junit.Assert.assertNotNull; - import static org.junit.Assert.assertNull; + import static org.junit.Assert.*; import java.io.IOException; ++import org.junit.BeforeClass; + import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.SimpleStrategy; - import org.junit.BeforeClass; - import org.junit.Test; import org.apache.cassandra.utils.ByteBufferUtil; /** * Test for the truncate operation. */ -public class RecoveryManagerTruncateTest extends SchemaLoader +public class RecoveryManagerTruncateTest { + private static final String KEYSPACE1 = "RecoveryManagerTruncateTest"; + private static final String CF_STANDARD1 = "Standard1"; ++ private static final String CF_STANDARD2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1)); ++ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), ++ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); + } + - @Test - public void testTruncate() throws IOException - { - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); + @Test + public void testTruncate() throws IOException + { - Keyspace keyspace = Keyspace.open("Keyspace1"); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); ++ Keyspace keyspace = Keyspace.open(KEYSPACE1); ++ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); - Mutation rm; - ColumnFamily cf; + Mutation rm; + ColumnFamily cf; - // add a single cell - cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); - cf.addColumn(column("col1", "val1", 1L)); + // add a single cell - cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); ++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1); + cf.addColumn(column("col1", "val1", 1L)); - rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf); - rm.apply(); + rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf); - rm.applyUnsafe(); ++ rm.applyUnsafe(); + long time = System.currentTimeMillis(); - // Make sure data was written - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1")); + // Make sure data was written - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col1")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1")); - // and now truncate it - cfs.truncateBlocking(); + // and now truncate it + cfs.truncateBlocking(); - CommitLog.instance.resetUnsafe(); - CommitLog.instance.recover(); + CommitLog.instance.resetUnsafe(false); - // and validate truncation. - assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1")); - } - - private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName) - { - ColumnFamily cf; - ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName); - if (cfStore == null) - { - return null; - } - cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName)); - if (cf == null) - { - return null; - } - return cf.getColumn(Util.cellname(columnName)); - } + // and validate truncation. - assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col1")); ++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col1")); + assertTrue(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time); + } + + @Test + public void testTruncatePointInTime() throws IOException + { - Keyspace keyspace = Keyspace.open("Keyspace1"); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); ++ Keyspace keyspace = Keyspace.open(KEYSPACE1); ++ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1); + + Mutation rm; + ColumnFamily cf; + + // add a single cell - cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); ++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1); + cf.addColumn(column("col2", "val1", 1L)); - rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf); ++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf); + rm.apply(); + + // Make sure data was written + long time = System.currentTimeMillis(); - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2")); + + // and now truncate it + cfs.truncateBlocking(); + + // verify truncation - assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col2")); ++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2")); + + try + { + // Restore to point in time. + CommitLog.instance.archiver.restorePointInTime = time; - CommitLog.instance.resetUnsafe(); - CommitLog.instance.recover(); ++ CommitLog.instance.resetUnsafe(false); + } + finally + { + CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE; + } + + // Validate pre-truncation data was restored. - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col2")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col2")); + // And that we don't have a truncation record after restore time. + assertFalse(SystemKeyspace.getTruncatedAt(cfs.metadata.cfId) > time); + } + + @Test + public void testTruncatePointInTimeReplayList() throws IOException + { - Keyspace keyspace = Keyspace.open("Keyspace1"); - ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore("Standard1"); - ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("Standard2"); ++ Keyspace keyspace = Keyspace.open(KEYSPACE1); ++ ColumnFamilyStore cfs1 = keyspace.getColumnFamilyStore(CF_STANDARD1); ++ ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore(CF_STANDARD2); + + Mutation rm; + ColumnFamily cf; + + // add a single cell - cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1"); ++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD1); + cf.addColumn(column("col3", "val1", 1L)); - rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf); ++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf); + rm.apply(); + + // add a single cell - cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard2"); ++ cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, CF_STANDARD2); + cf.addColumn(column("col4", "val1", 1L)); - rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("keymulti"), cf); ++ rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("keymulti"), cf); + rm.apply(); + + // Make sure data was written + long time = System.currentTimeMillis(); - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3")); - assertNotNull(getFromTable(keyspace, "Standard2", "keymulti", "col4")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4")); + + // and now truncate it + cfs1.truncateBlocking(); + cfs2.truncateBlocking(); + + // verify truncation - assertNull(getFromTable(keyspace, "Standard1", "keymulti", "col3")); - assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4")); ++ assertNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3")); ++ assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4")); + + try + { + // Restore to point in time. + CommitLog.instance.archiver.restorePointInTime = time; - System.setProperty("cassandra.replayList", "Keyspace1.Standard1"); - CommitLog.instance.resetUnsafe(); - CommitLog.instance.recover(); ++ System.setProperty("cassandra.replayList", KEYSPACE1 + "." + CF_STANDARD1); ++ CommitLog.instance.resetUnsafe(false); + } + finally + { + CommitLog.instance.archiver.restorePointInTime = Long.MAX_VALUE; + System.clearProperty("cassandra.replayList"); + } + + // Validate pre-truncation data was restored. - assertNotNull(getFromTable(keyspace, "Standard1", "keymulti", "col3")); ++ assertNotNull(getFromTable(keyspace, CF_STANDARD1, "keymulti", "col3")); + // But only on the replayed table. - assertNull(getFromTable(keyspace, "Standard2", "keymulti", "col4")); ++ assertNull(getFromTable(keyspace, CF_STANDARD2, "keymulti", "col4")); + + // And that we have the correct truncation records. + assertFalse(SystemKeyspace.getTruncatedAt(cfs1.metadata.cfId) > time); + assertTrue(SystemKeyspace.getTruncatedAt(cfs2.metadata.cfId) > time); + } + + private Cell getFromTable(Keyspace keyspace, String cfName, String keyName, String columnName) + { + ColumnFamily cf; + ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(cfName); + if (cfStore == null) + { + return null; + } + cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk(keyName), columnName)); + if (cf == null) + { + return null; + } + return cf.getColumn(Util.cellname(columnName)); + } }
