Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77abf868 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77abf868 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77abf868 Branch: refs/heads/trunk Commit: 77abf868a4f60f6978c8d3e334c1a2275c4c37a3 Parents: ebefc96 be21174 Author: Blake Eggleston <[email protected]> Authored: Fri Sep 29 15:33:44 2017 -0700 Committer: Blake Eggleston <[email protected]> Committed: Fri Sep 29 15:34:44 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLogReader.java | 48 +++++++++++++++++- .../db/commitlog/CommitLogReplayer.java | 7 ++- .../db/commitlog/CommitLogSegment.java | 2 +- .../db/commitlog/CompressedSegment.java | 2 +- .../db/commitlog/EncryptedSegment.java | 4 +- .../db/commitlog/MemoryMappedSegment.java | 2 +- .../cassandra/db/commitlog/CommitLogTest.java | 52 ++++++++++++++++++++ 8 files changed, 109 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 99b5a59,a782333..1495c5d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -255,6 -115,6 +255,7 @@@ Merged from 2.1 * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835) Merged from 3.0: ++ * Filter header only commit logs before recovery (CASSANDRA-13918) * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172) * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004) * Failed unregistering mbean during drop keyspace (CASSANDRA-13346) http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 864325b,4d74557..75ef8e9 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@@ -35,7 -34,8 +35,8 @@@ import org.apache.cassandra.db.commitlo import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.exceptions.UnknownTableException; + import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; @@@ -78,6 -77,48 +79,44 @@@ public class CommitLogReade readAllFiles(handler, files, CommitLogPosition.NONE); } + private static boolean shouldSkip(File file) throws IOException, ConfigurationException + { + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); - if (desc.version < CommitLogDescriptor.VERSION_21) - { - return false; - } + try(RandomAccessReader reader = RandomAccessReader.open(file)) + { + CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); + int end = reader.readInt(); + long filecrc = reader.readInt() & 0xffffffffL; + return end == 0 && filecrc == 0; + } + } + - private static List<File> filterCommitLogFiles(File[] toFilter) ++ static List<File> filterCommitLogFiles(File[] toFilter) + { + List<File> filtered = new ArrayList<>(toFilter.length); + for (File file: toFilter) + { + try + { + if (shouldSkip(file)) + { + logger.info("Skipping playback of empty log: {}", file.getName()); + } + else + { + filtered.add(file); + } + } + catch (Exception e) + { + // let recover deal with it + filtered.add(file); + } + } + + return filtered; + } + /** * Reads all passed in files with minPosition, no start, and no mutation limit. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index d1e63e6,ea62fd8..7cb277e --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -144,44 -134,9 +144,47 @@@ public class CommitLogReplayer implemen public void replayFiles(File[] clogs) throws IOException { - for (int i = 0; i < clogs.length; i++) - commitLogReader.readAllFiles(this, clogs, globalPosition); ++ List<File> filteredLogs = CommitLogReader.filterCommitLogFiles(clogs); ++ int i = 0; ++ for (File file: filteredLogs) + { ++ i++; + sawCDCMutation = false; - commitLogReader.readCommitLogSegment(this, clogs[i], globalPosition, i == clogs.length - 1); ++ commitLogReader.readCommitLogSegment(this, file, globalPosition, i == filteredLogs.size()); + if (sawCDCMutation) + handleCDCReplayCompletion(clogs[i]); + } } + + /** + * Upon replay completion, CDC needs to hard-link files in the CDC folder and calculate index files so consumers can + * begin their work. + */ + private void handleCDCReplayCompletion(File f) throws IOException + { + // Can only reach this point if CDC is enabled, thus we have a CDCSegmentManager + ((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).addCDCSize(f.length()); + + File dest = new File(DatabaseDescriptor.getCDCLogLocation(), f.getName()); + + // If hard link already exists, assume it's from a previous node run. If people are mucking around in the cdc_raw + // directory that's on them. + if (!dest.exists()) + FileUtils.createHardLink(f, dest); + + // The reader has already verified we can deserialize the descriptor. + CommitLogDescriptor desc; + try(RandomAccessReader reader = RandomAccessReader.open(f)) + { + desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); + assert desc != null; + assert f.length() < Integer.MAX_VALUE; + CommitLogSegment.writeCDCIndexFile(desc, (int)f.length(), true); + } + } + + /** * Flushes all keyspaces associated with this replayer in parallel, blocking until their flushes are complete. * @return the number of mutations replayed http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 956a05d,267813e..1060a72 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@@ -156,16 -156,74 +157,67 @@@ public class CommitLogTes } @Test - public void testRecoveryWithFinalEmptyLog() throws Exception + public void testRecoveryWithEmptyFinalLog() throws Exception { - // Even though it's empty, it's the last commitlog segment, so allowTruncation=true should allow it to pass - CommitLog.instance.recoverFiles(new File[]{tmpFile(CommitLogDescriptor.current_version)}); + CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version)); } + /** + * Since commit log segments can be allocated before they're needed, the commit log file with the highest + * id isn't neccesarily the last log that we wrote to. We should remove header only logs on recover so we + * can tolerate truncated logs + */ + @Test + public void testHeaderOnlyFileFiltering() throws Exception + { + File directory = Files.createTempDir(); + + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, DatabaseDescriptor.getEncryptionContext()); + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null, DatabaseDescriptor.getEncryptionContext()); + + ByteBuffer buffer; + + // this has a header and malformed data + File file1 = new File(directory, desc1.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc1); + int pos = buffer.position(); + CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(), buffer.position() + 128); + buffer.position(pos + 8); + buffer.putInt(5); + buffer.putInt(6); + + try (OutputStream lout = new FileOutputStream(file1)) + { + lout.write(buffer.array()); + } + + // this has only a header + File file2 = new File(directory, desc2.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc2); + try (OutputStream lout = new FileOutputStream(file2)) + { + lout.write(buffer.array()); + } + + // one corrupt file and one header only file should be ok + runExpecting(() -> { + CommitLog.instance.recoverFiles(file1, file2); + return null; + }, null); + + // 2 corrupt files and one header only file should fail + runExpecting(() -> { + CommitLog.instance.recoverFiles(file1, file1, file2); + return null; + }, CommitLogReplayException.class); + } @Test - public void testRecoveryWithEmptyLog20() throws Exception - { - CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.VERSION_20)); - } - - @Test public void testRecoveryWithZeroLog() throws Exception { - testRecovery(new byte[10], null); + testRecovery(new byte[10], CommitLogReplayException.class); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
