Add Commitlog Versioning patch by Vijay; reviewed by jbellis for CASSANDRA-4357
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57998976 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57998976 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57998976 Branch: refs/heads/trunk Commit: 57998976f0024776bab6b2301f2436ea60e38fe0 Parents: 7cb3b73 Author: Vijay Parthasarathy <[email protected]> Authored: Thu Jun 28 14:35:37 2012 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Thu Jun 28 14:37:15 2012 -0700 ---------------------------------------------------------------------- src/java/org/apache/cassandra/config/Schema.java | 6 +- .../apache/cassandra/db/commitlog/CommitLog.java | 2 +- .../cassandra/db/commitlog/CommitLogAllocator.java | 4 +- .../cassandra/db/commitlog/CommitLogArchiver.java | 7 +- .../db/commitlog/CommitLogDescriptor.java | 102 +++++++++++++++ .../cassandra/db/commitlog/CommitLogReplayer.java | 11 +- .../cassandra/db/commitlog/CommitLogSegment.java | 42 +------ .../org/apache/cassandra/db/CommitLogTest.java | 24 ++++- 8 files changed, 144 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 7d52d2e..c36ee77 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -20,7 +20,6 @@ package org.apache.cassandra.config; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -34,6 +33,7 @@ import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.db.Row; import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.Pair; @@ -347,12 +347,12 @@ public class Schema oldCfIdMap.put(oldId, newId); } - public UUID convertOldCfId(Integer oldCfId) + public UUID convertOldCfId(Integer oldCfId) throws UnknownColumnFamilyException { UUID cfId = oldCfIdMap.get(oldCfId); if (cfId == null) - throw new IllegalArgumentException("ColumnFamily identified by old " + oldCfId + " was not found."); + throw new UnknownColumnFamilyException("ColumnFamily identified by old " + oldCfId + " was not found.", null); return cfId; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 6649ae4..e339cbc 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -112,7 +112,7 @@ public class CommitLog implements CommitLogMBean // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes) // until after recover was finished. this turns out to be fragile; it is less error-prone to go // ahead and allow writes before recover(), and just skip active segments when we do. - return CommitLogSegment.possibleCommitLogFile(name) && !instance.allocator.manages(name); + return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name); } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java index e402a4a..dff4093 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java @@ -39,6 +39,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Table; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.WrappedRunnable; @@ -178,7 +179,8 @@ public class CommitLogAllocator public void recycleSegment(final File file) { // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests - if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()) + if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize() + || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version) { // (don't decrease managed size, since this was never a "live" segment) try http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index 556a37a..b8c8dfb 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -20,7 +20,6 @@ package org.apache.cassandra.db.commitlog; * */ - import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -31,7 +30,6 @@ import java.util.Properties; import java.util.concurrent.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; -import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.FBUtilities; @@ -148,10 +146,7 @@ public class CommitLogArchiver File[] files = new File(dir).listFiles(); for (File fromFile : files) { - File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), - CommitLogSegment.FILENAME_PREFIX + - System.nanoTime() + - CommitLogSegment.FILENAME_EXTENSION); + File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), new CommitLogDescriptor(System.nanoTime()).fileName()); String command = restoreCommand.replace("%from", fromFile.getPath()); command = command.replace("%to", toFile.getPath()); exec(command); http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java new file mode 100644 index 0000000..1d67f5c --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -0,0 +1,102 @@ +package org.apache.cassandra.db.commitlog; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.cassandra.net.MessagingService; + +public class CommitLogDescriptor +{ + private static final String SEPARATOR = "-"; + private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR; + private static final String FILENAME_EXTENSION = ".log"; + // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. + private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); + + public static final int LEGACY_VERSION = 1; + public static final int VERSION_12 = 2; + /** + * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. + * Note: make sure to handle {@link #getMessagingVersion()} + */ + public static final int current_version = VERSION_12; + + private final int version; + public final long id; + + public CommitLogDescriptor(int version, long id) + { + this.version = version; + this.id = id; + } + + public CommitLogDescriptor(long id) + { + this(current_version, id); + } + + public static CommitLogDescriptor fromFileName(String name) + { + Matcher matcher; + if (!(matcher = COMMIT_LOG_FILE_PATTERN.matcher(name)).matches()) + throw new RuntimeException("Cannot parse the version of the file: " + name); + + if (matcher.group(3) != null) + { + long id = Long.valueOf(matcher.group(3).split(SEPARATOR)[1]); + return new CommitLogDescriptor(Integer.valueOf(matcher.group(2)), id); + } + else + { + long id = Long.valueOf(matcher.group(1)); + return new CommitLogDescriptor(LEGACY_VERSION, id); + } + } + + public int getMessagingVersion() + { + switch (version) + { + case LEGACY_VERSION: + return MessagingService.VERSION_11; + case VERSION_12: + return MessagingService.VERSION_12; + default: + throw new IllegalStateException("Unknown commitlog version " + version); + } + } + + public String fileName() + { + return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION; + } + + /** + * @param filename the filename to check + * @return true if filename could be a commit log based on it's filename + */ + public static boolean isValid(String filename) + { + return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 5dc29ac..6a20027 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -39,7 +39,6 @@ import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.PureJavaCrc32; @@ -59,7 +58,7 @@ public class CommitLogReplayer private final Set<Table> tablesRecovered; private final List<Future<?>> futures; private final Map<UUID, AtomicInteger> invalidMutations; -private final AtomicInteger replayedCount; + private final AtomicInteger replayedCount; private final Map<UUID, ReplayPosition> cfPositions; private final ReplayPosition globalPosition; private final Checksum checksum; @@ -113,7 +112,9 @@ private final AtomicInteger replayedCount; public void recover(File file) throws IOException { logger.info("Replaying " + file.getPath()); - final long segment = CommitLogSegment.idFromFilename(file.getName()); + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); + final long segment = desc.id; + int version = desc.getMessagingVersion(); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); assert reader.length() <= Integer.MAX_VALUE; try @@ -195,10 +196,12 @@ private final AtomicInteger replayedCount; { // assuming version here. We've gone to lengths to make sure what gets written to the CL is in // the current version. so do make sure the CL is drained prior to upgrading a node. - rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), MessagingService.current_version, IColumnSerializer.Flag.LOCAL); + rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, IColumnSerializer.Flag.LOCAL); } catch (UnknownColumnFamilyException ex) { + if (ex.cfId == null) + continue; AtomicInteger i = invalidMutations.get(ex.cfId); if (i == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 0f3cb06..bf67095 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -25,8 +25,6 @@ import java.nio.channels.FileChannel; import java.nio.MappedByteBuffer; import java.util.Collection; import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.zip.Checksum; import java.util.HashMap; @@ -51,10 +49,6 @@ public class CommitLogSegment { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); - static final String FILENAME_PREFIX = "CommitLog-"; - static final String FILENAME_EXTENSION = ".log"; - private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION); - // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum) static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; @@ -71,6 +65,8 @@ public class CommitLogSegment private final MappedByteBuffer buffer; private boolean closed; + public final CommitLogDescriptor descriptor; + /** * @return a newly minted segment file */ @@ -87,7 +83,8 @@ public class CommitLogSegment CommitLogSegment(String filePath) { id = System.nanoTime(); - logFile = new File(DatabaseDescriptor.getCommitLogLocation(), FILENAME_PREFIX + id + FILENAME_EXTENSION); + descriptor = new CommitLogDescriptor(id); + logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); boolean isCreating = true; try @@ -127,37 +124,6 @@ public class CommitLogSegment } /** - * Extracts the commit log ID from filename - * - * @param filename the filename of the commit log file - * @return the extracted commit log ID - */ - public static long idFromFilename(String filename) - { - Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename); - try - { - if (matcher.matches()) - return Long.valueOf(matcher.group(1)); - else - return -1L; - } - catch (NumberFormatException e) - { - return -1L; - } - } - - /** - * @param filename the filename to check - * @return true if filename could be a commit log based on it's filename - */ - public static boolean possibleCommitLogFile(String filename) - { - return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); - } - - /** * Completely discards a segment file by deleting it. (Potentially blocking operation) */ public void discard(boolean deleteFile) http://git-wip-us.apache.org/repos/asf/cassandra/blob/57998976/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java index 5c52d07..983ba10 100644 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@ -25,11 +25,15 @@ import java.util.UUID; import java.util.zip.CRC32; import java.util.zip.Checksum; +import junit.framework.Assert; + import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogDescriptor; import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.net.MessagingService; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -190,7 +194,7 @@ public class CommitLogTest extends SchemaLoader protected File tmpFile() throws IOException { - File logFile = File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null); + File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log"); logFile.deleteOnExit(); assert logFile.length() == 0; return logFile; @@ -204,4 +208,22 @@ public class CommitLogTest extends SchemaLoader //statics make it annoying to test things correctly CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ } + + @Test + public void testVersions() + { + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); + + Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").id); + + Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L).getMessagingVersion()); + String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; + Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + Assert.assertEquals(MessagingService.VERSION_11, CommitLogDescriptor.fromFileName("CommitLog-1340512736956320000.log").getMessagingVersion()); + } }
