http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java deleted file mode 100644 index 17980de..0000000 --- a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.commitlog; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.zip.CRC32; -import javax.crypto.Cipher; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.AbstractIterator; - -import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider; -import org.apache.cassandra.io.FSReadError; -import org.apache.cassandra.io.compress.ICompressor; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileSegmentInputStream; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.schema.CompressionParams; -import org.apache.cassandra.security.EncryptionUtils; -import org.apache.cassandra.security.EncryptionContext; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE; -import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; - -/** - * Read each sync section of a commit log, iteratively. - */ -public class SegmentReader implements Iterable<SegmentReader.SyncSegment> -{ - private final CommitLogDescriptor descriptor; - private final RandomAccessReader reader; - private final Segmenter segmenter; - private final boolean tolerateTruncation; - - /** - * ending position of the current sync section. - */ - protected int end; - - protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation) - { - this.descriptor = descriptor; - this.reader = reader; - this.tolerateTruncation = tolerateTruncation; - - end = (int) reader.getFilePointer(); - if (descriptor.getEncryptionContext().isEnabled()) - segmenter = new EncryptedSegmenter(reader, descriptor); - else if (descriptor.compression != null) - segmenter = new CompressedSegmenter(descriptor, reader); - else - segmenter = new NoOpSegmenter(reader); - } - - public Iterator<SyncSegment> iterator() - { - return new SegmentIterator(); - } - - protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment> - { - protected SyncSegment computeNext() - { - while (true) - { - try - { - final int currentStart = end; - end = readSyncMarker(descriptor, currentStart, reader); - if (end == -1) - { - return endOfData(); - } - if (end > reader.length()) - { - // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now. - // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt - end = (int) reader.length(); - } - - return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end); - } - catch(SegmentReader.SegmentReadException e) - { - try - { - CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage()); - } - catch (IOException ioe) - { - throw new RuntimeException(ioe); - } - } - catch (IOException e) - { - try - { - boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length()); - // if no exception is thrown, the while loop will continue - CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage()); - } - catch (IOException ioe) - { - throw new RuntimeException(ioe); - } - } - } - } - } - - private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException - { - if (offset > reader.length() - SYNC_MARKER_SIZE) - { - // There was no room in the segment to write a final header. No data could be present here. - return -1; - } - reader.seek(offset); - CRC32 crc = new CRC32(); - updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); - updateChecksumInt(crc, (int) (descriptor.id >>> 32)); - updateChecksumInt(crc, (int) reader.getPosition()); - final int end = reader.readInt(); - long filecrc = reader.readInt() & 0xffffffffL; - if (crc.getValue() != filecrc) - { - if (end != 0 || filecrc != 0) - { - String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " + - "The end of segment marker should be zero.", offset, reader.getPath()); - throw new SegmentReadException(msg, true); - } - return -1; - } - else if (end < offset || end > reader.length()) - { - String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath()); - throw new SegmentReadException(msg, false); - } - return end; - } - - public static class SegmentReadException extends IOException - { - public final boolean invalidCrc; - - public SegmentReadException(String msg, boolean invalidCrc) - { - super(msg); - this.invalidCrc = invalidCrc; - } - } - - public static class SyncSegment - { - /** the 'buffer' to replay commit log data from */ - public final FileDataInput input; - - /** offset in file where this section begins. */ - public final int fileStartPosition; - - /** offset in file where this section ends. */ - public final int fileEndPosition; - - /** the logical ending position of the buffer */ - public final int endPosition; - - public final boolean toleratesErrorsInSection; - - public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection) - { - this.input = input; - this.fileStartPosition = fileStartPosition; - this.fileEndPosition = fileEndPosition; - this.endPosition = endPosition; - this.toleratesErrorsInSection = toleratesErrorsInSection; - } - } - - /** - * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers. - */ - interface Segmenter - { - /** - * Get the next section of the commit log to replay. - * - * @param startPosition the position in the file to begin reading at - * @param nextSectionStartPosition the file position of the beginning of the next section - * @return the buffer and it's logical end position - * @throws IOException - */ - SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException; - - /** - * Determine if we tolerate errors in the current segment. - */ - default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength) - { - return segmentEndPosition >= fileLength || segmentEndPosition < 0; - } - } - - static class NoOpSegmenter implements Segmenter - { - private final RandomAccessReader reader; - - public NoOpSegmenter(RandomAccessReader reader) - { - this.reader = reader; - } - - public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) - { - reader.seek(startPosition); - return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true); - } - - public boolean tolerateSegmentErrors(int end, long length) - { - return true; - } - } - - static class CompressedSegmenter implements Segmenter - { - private final ICompressor compressor; - private final RandomAccessReader reader; - private byte[] compressedBuffer; - private byte[] uncompressedBuffer; - private long nextLogicalStart; - - public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader) - { - this(CompressionParams.createCompressor(desc.compression), reader); - } - - public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader) - { - this.compressor = compressor; - this.reader = reader; - compressedBuffer = new byte[0]; - uncompressedBuffer = new byte[0]; - nextLogicalStart = reader.getFilePointer(); - } - - public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException - { - reader.seek(startPosition); - int uncompressedLength = reader.readInt(); - - int compressedLength = nextSectionStartPosition - (int)reader.getPosition(); - if (compressedLength > compressedBuffer.length) - compressedBuffer = new byte[(int) (1.2 * compressedLength)]; - reader.readFully(compressedBuffer, 0, compressedLength); - - if (uncompressedLength > uncompressedBuffer.length) - uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; - int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0); - nextLogicalStart += SYNC_MARKER_SIZE; - FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart); - nextLogicalStart += uncompressedLength; - return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length())); - } - } - - static class EncryptedSegmenter implements Segmenter - { - private final RandomAccessReader reader; - private final ICompressor compressor; - private final Cipher cipher; - - /** - * the result of the decryption is written into this buffer. - */ - private ByteBuffer decryptedBuffer; - - /** - * the result of the decryption is written into this buffer. - */ - private ByteBuffer uncompressedBuffer; - - private final ChunkProvider chunkProvider; - - private long currentSegmentEndPosition; - private long nextLogicalStart; - - public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor) - { - this(reader, descriptor.getEncryptionContext()); - } - - @VisibleForTesting - EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext) - { - this.reader = reader; - decryptedBuffer = ByteBuffer.allocate(0); - compressor = encryptionContext.getCompressor(); - nextLogicalStart = reader.getFilePointer(); - - try - { - cipher = encryptionContext.getDecryptor(); - } - catch (IOException ioe) - { - throw new FSReadError(ioe, reader.getPath()); - } - - chunkProvider = () -> { - if (reader.getFilePointer() >= currentSegmentEndPosition) - return ByteBufferUtil.EMPTY_BYTE_BUFFER; - try - { - decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher); - uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor); - return uncompressedBuffer; - } - catch (IOException e) - { - throw new FSReadError(e, reader.getPath()); - } - }; - } - - public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException - { - int totalPlainTextLength = reader.readInt(); - currentSegmentEndPosition = nextSectionStartPosition - 1; - - nextLogicalStart += SYNC_MARKER_SIZE; - FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider); - nextLogicalStart += totalPlainTextLength; - return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length())); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java new file mode 100644 index 0000000..1c10c25 --- /dev/null +++ b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.commitlog; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.FileUtils; + +/** + * A very simple Bytebuffer pool with a fixed allocation size and a cached max allocation count. Will allow + * you to go past the "max", freeing all buffers allocated beyond the max buffer count on release. + * + * Has a reusable thread local ByteBuffer that users can make use of. + */ +public class SimpleCachedBufferPool +{ + protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>() + { + protected ByteBuffer initialValue() + { + return ByteBuffer.allocate(0); + } + }; + + private Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>(); + private AtomicInteger usedBuffers = new AtomicInteger(0); + + /** + * Maximum number of buffers in the compression pool. Any buffers above this count that are allocated will be cleaned + * upon release rather than held and re-used. + */ + private final int maxBufferPoolSize; + + /** + * Size of individual buffer segments on allocation. + */ + private final int bufferSize; + + public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize) + { + this.maxBufferPoolSize = maxBufferPoolSize; + this.bufferSize = bufferSize; + } + + public ByteBuffer createBuffer(BufferType bufferType) + { + usedBuffers.incrementAndGet(); + ByteBuffer buf = bufferPool.poll(); + if (buf != null) + { + buf.clear(); + return buf; + } + return bufferType.allocate(bufferSize); + } + + public ByteBuffer getThreadLocalReusableBuffer() + { + return reusableBufferHolder.get(); + } + + public void setThreadLocalReusableBuffer(ByteBuffer buffer) + { + reusableBufferHolder.set(buffer); + } + + public void releaseBuffer(ByteBuffer buffer) + { + usedBuffers.decrementAndGet(); + + if (bufferPool.size() < maxBufferPoolSize) + bufferPool.add(buffer); + else + FileUtils.clean(buffer); + } + + public void shutdown() + { + bufferPool.clear(); + } + + public boolean atLimit() + { + return usedBuffers.get() >= maxBufferPoolSize; + } + + @Override + public String toString() + { + return new StringBuilder() + .append("SimpleBufferPool:") + .append(" bufferCount:").append(usedBuffers.get()) + .append(", bufferSize:").append(maxBufferPoolSize) + .append(", buffer size:").append(bufferSize) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index be1436c..f88877a 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -32,7 +32,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -199,7 +199,7 @@ public class Tracker public void reset() { view.set(new View( - !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore)) + !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfstore)) : ImmutableList.<Memtable>of(), ImmutableList.<Memtable>of(), Collections.<SSTableReader, SSTableReader>emptyMap(), @@ -293,7 +293,7 @@ public class Tracker /** * get the Memtable that the ordered writeOp should be directed to */ - public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition) + public Memtable getMemtableFor(OpOrder.Group opGroup, CommitLogPosition commitLogPosition) { // since any new memtables appended to the list after we fetch it will be for operations started // after us, we can safely assume that we will always find the memtable that 'accepts' us; @@ -304,7 +304,7 @@ public class Tracker // assign operations to a memtable that was retired/queued before we started) for (Memtable memtable : view.get().liveMemtables) { - if (memtable.accepts(opGroup, replayPosition)) + if (memtable.accepts(opGroup, commitLogPosition)) return memtable; } throw new AssertionError(view.get().liveMemtables.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/view/TableViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java index 15185f9..e4cdde3 100644 --- a/src/java/org/apache/cassandra/db/view/TableViews.java +++ b/src/java/org/apache/cassandra/db/view/TableViews.java @@ -28,7 +28,7 @@ import com.google.common.collect.PeekingIterator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; @@ -94,7 +94,7 @@ public class TableViews extends AbstractCollection<View> viewCfs.dumpMemtable(); } - public void truncateBlocking(ReplayPosition replayAfter, long truncatedAt) + public void truncateBlocking(CommitLogPosition replayAfter, long truncatedAt) { for (ColumnFamilyStore viewCfs : allViewsCfs()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index bd73733..14bcd58 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.view; -import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; @@ -31,7 +30,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ViewDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.service.StorageService; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index db69f2f..d11e057 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -45,7 +45,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java index 4561520..505de49 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java @@ -24,7 +24,7 @@ import java.util.*; import com.google.common.collect.Maps; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.Version; @@ -55,7 +55,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out); - ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out); + CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out); out.writeLong(stats.minTimestamp); out.writeLong(stats.maxTimestamp); out.writeInt(stats.maxLocalDeletionTime); @@ -72,7 +72,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer for (ByteBuffer value : stats.maxClusteringValues) ByteBufferUtil.writeWithShortLength(value, out); if (version.hasCommitLogLowerBound()) - ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out); + CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out); } /** @@ -94,8 +94,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer { EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in); EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in); - ReplayPosition commitLogLowerBound = ReplayPosition.NONE; - ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in); + CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE; + CommitLogPosition commitLogUpperBound = CommitLogPosition.serializer.deserialize(in); long minTimestamp = in.readLong(); long maxTimestamp = in.readLong(); int maxLocalDeletionTime = in.readInt(); @@ -120,7 +120,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer maxColumnNames.add(ByteBufferUtil.readWithShortLength(in)); if (descriptor.version.hasCommitLogLowerBound()) - commitLogLowerBound = ReplayPosition.serializer.deserialize(in); + commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); if (types.contains(MetadataType.VALIDATION)) components.put(MetadataType.VALIDATION, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index ca50a44..299bc87 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -29,7 +29,7 @@ import com.google.common.collect.Ordering; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.db.rows.Cell; @@ -66,8 +66,8 @@ public class MetadataCollector implements PartitionStatisticsCollector { return new StatsMetadata(defaultPartitionSizeHistogram(), defaultCellPerPartitionCountHistogram(), - ReplayPosition.NONE, - ReplayPosition.NONE, + CommitLogPosition.NONE, + CommitLogPosition.NONE, Long.MIN_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, @@ -88,8 +88,8 @@ public class MetadataCollector implements PartitionStatisticsCollector protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); // TODO: cound the number of row per partition (either with the number of cells, or instead) protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); - protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE; - protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE; + protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE; + protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE; protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); @@ -123,7 +123,7 @@ public class MetadataCollector implements PartitionStatisticsCollector { this(comparator); - ReplayPosition min = null, max = null; + CommitLogPosition min = null, max = null; for (SSTableReader sstable : sstables) { if (min == null) @@ -226,13 +226,13 @@ public class MetadataCollector implements PartitionStatisticsCollector ttlTracker.update(newTTL); } - public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound) + public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound) { this.commitLogLowerBound = commitLogLowerBound; return this; } - public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound) + public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound) { this.commitLogUpperBound = commitLogUpperBound; return this; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 07e35bb..e765235 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -26,7 +26,7 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -42,8 +42,8 @@ public class StatsMetadata extends MetadataComponent public final EstimatedHistogram estimatedPartitionSize; public final EstimatedHistogram estimatedColumnCount; - public final ReplayPosition commitLogLowerBound; - public final ReplayPosition commitLogUpperBound; + public final CommitLogPosition commitLogLowerBound; + public final CommitLogPosition commitLogUpperBound; public final long minTimestamp; public final long maxTimestamp; public final int minLocalDeletionTime; @@ -62,8 +62,8 @@ public class StatsMetadata extends MetadataComponent public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, - ReplayPosition commitLogLowerBound, - ReplayPosition commitLogUpperBound, + CommitLogPosition commitLogLowerBound, + CommitLogPosition commitLogUpperBound, long minTimestamp, long maxTimestamp, int minLocalDeletionTime, @@ -239,7 +239,7 @@ public class StatsMetadata extends MetadataComponent int size = 0; size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize); size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount); - size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound); + size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound); if (version.storeRows()) size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long) else @@ -258,7 +258,7 @@ public class StatsMetadata extends MetadataComponent if (version.storeRows()) size += 8 + 8; // totalColumnsSet, totalRows if (version.hasCommitLogLowerBound()) - size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound); + size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound); return size; } @@ -266,7 +266,7 @@ public class StatsMetadata extends MetadataComponent { EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out); EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out); - ReplayPosition.serializer.serialize(component.commitLogUpperBound, out); + CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out); out.writeLong(component.minTimestamp); out.writeLong(component.maxTimestamp); if (version.storeRows()) @@ -296,15 +296,15 @@ public class StatsMetadata extends MetadataComponent } if (version.hasCommitLogLowerBound()) - ReplayPosition.serializer.serialize(component.commitLogLowerBound, out); + CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out); } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException { EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in); EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in); - ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound; - commitLogUpperBound = ReplayPosition.serializer.deserialize(in); + CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE, commitLogUpperBound; + commitLogUpperBound = CommitLogPosition.serializer.deserialize(in); long minTimestamp = in.readLong(); long maxTimestamp = in.readLong(); // We use MAX_VALUE as that's the default value for "no deletion time" @@ -337,7 +337,7 @@ public class StatsMetadata extends MetadataComponent long totalRows = version.storeRows() ? in.readLong() : -1L; if (version.hasCommitLogLowerBound()) - commitLogLowerBound = ReplayPosition.serializer.deserialize(in); + commitLogLowerBound = CommitLogPosition.serializer.deserialize(in); return new StatsMetadata(partitionSizes, columnCounts, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java index 1da6ed0..08c1c8e 100644 --- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java @@ -17,11 +17,10 @@ */ package org.apache.cassandra.metrics; - import com.codahale.metrics.Gauge; import com.codahale.metrics.Timer; import org.apache.cassandra.db.commitlog.AbstractCommitLogService; -import org.apache.cassandra.db.commitlog.CommitLogSegmentManager; +import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -42,14 +41,14 @@ public class CommitLogMetrics public final Timer waitingOnSegmentAllocation; /** The time spent waiting on CL sync; for Periodic this is only occurs when the sync is lagging its sync interval */ public final Timer waitingOnCommit; - + public CommitLogMetrics() { waitingOnSegmentAllocation = Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation")); waitingOnCommit = Metrics.timer(factory.createMetricName("WaitingOnCommit")); } - public void attach(final AbstractCommitLogService service, final CommitLogSegmentManager allocator) + public void attach(final AbstractCommitLogService service, final AbstractCommitLogSegmentManager segmentManager) { completedTasks = Metrics.register(factory.createMetricName("CompletedTasks"), new Gauge<Long>() { @@ -69,7 +68,7 @@ public class CommitLogMetrics { public Long getValue() { - return allocator.onDiskSize(); + return segmentManager.onDiskSize(); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 4dc273a..dd0bb46 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -116,6 +116,7 @@ public final class SchemaKeyspace + "min_index_interval int," + "read_repair_chance double," + "speculative_retry text," + + "cdc boolean," + "PRIMARY KEY ((keyspace_name), table_name))"); private static final CFMetaData Columns = @@ -179,6 +180,7 @@ public final class SchemaKeyspace + "min_index_interval int," + "read_repair_chance double," + "speculative_retry text," + + "cdc boolean," + "PRIMARY KEY ((keyspace_name), view_name))"); private static final CFMetaData Indexes = @@ -508,7 +510,8 @@ public final class SchemaKeyspace .frozenMap("caching", params.caching.asMap()) .frozenMap("compaction", params.compaction.asMap()) .frozenMap("compression", params.compression.asMap()) - .frozenMap("extensions", params.extensions); + .frozenMap("extensions", params.extensions) + .add("cdc", params.cdc); } public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace, @@ -986,6 +989,7 @@ public final class SchemaKeyspace .readRepairChance(row.getDouble("read_repair_chance")) .crcCheckChance(row.getDouble("crc_check_chance")) .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) + .cdc(row.has("cdc") ? row.getBoolean("cdc") : false) .build(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/schema/TableParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java index 7e44e73..16b4427 100644 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -25,6 +25,7 @@ import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; import org.apache.cassandra.exceptions.ConfigurationException; + import static java.lang.String.format; public final class TableParams @@ -47,7 +48,8 @@ public final class TableParams MIN_INDEX_INTERVAL, READ_REPAIR_CHANCE, SPECULATIVE_RETRY, - CRC_CHECK_CHANCE; + CRC_CHECK_CHANCE, + CDC; @Override public String toString() @@ -81,6 +83,7 @@ public final class TableParams public final CompactionParams compaction; public final CompressionParams compression; public final ImmutableMap<String, ByteBuffer> extensions; + public final boolean cdc; private TableParams(Builder builder) { @@ -101,6 +104,7 @@ public final class TableParams compaction = builder.compaction; compression = builder.compression; extensions = builder.extensions; + cdc = builder.cdc; } public static Builder builder() @@ -124,7 +128,8 @@ public final class TableParams .minIndexInterval(params.minIndexInterval) .readRepairChance(params.readRepairChance) .speculativeRetry(params.speculativeRetry) - .extensions(params.extensions); + .extensions(params.extensions) + .cdc(params.cdc); } public void validate() @@ -212,7 +217,8 @@ public final class TableParams && caching.equals(p.caching) && compaction.equals(p.compaction) && compression.equals(p.compression) - && extensions.equals(p.extensions); + && extensions.equals(p.extensions) + && cdc == p.cdc; } @Override @@ -232,7 +238,8 @@ public final class TableParams caching, compaction, compression, - extensions); + extensions, + cdc); } @Override @@ -254,6 +261,7 @@ public final class TableParams .add(Option.COMPACTION.toString(), compaction) .add(Option.COMPRESSION.toString(), compression) .add(Option.EXTENSIONS.toString(), extensions) + .add(Option.CDC.toString(), cdc) .toString(); } @@ -274,6 +282,7 @@ public final class TableParams private CompactionParams compaction = CompactionParams.DEFAULT; private CompressionParams compression = CompressionParams.DEFAULT; private ImmutableMap<String, ByteBuffer> extensions = ImmutableMap.of(); + private boolean cdc; public Builder() { @@ -368,6 +377,12 @@ public final class TableParams return this; } + public Builder cdc(boolean val) + { + cdc = val; + return this; + } + public Builder extensions(Map<String, ByteBuffer> val) { extensions = ImmutableMap.copyOf(val); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index f1d6bb9..2d21bff 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -310,10 +310,10 @@ public class CassandraDaemon logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)"); } - // replay the log if necessary + // Replay any CommitLogSegments found on disk try { - CommitLog.instance.recover(); + CommitLog.instance.recoverSegmentsOnDisk(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 081030d..66990c9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -133,6 +133,7 @@ public class StreamReceiveTask extends StreamTask public void run() { boolean hasViews = false; + boolean hasCDC = false; ColumnFamilyStore cfs = null; try { @@ -147,16 +148,22 @@ public class StreamReceiveTask extends StreamTask } cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); + hasCDC = cfs.metadata.params.cdc; Collection<SSTableReader> readers = task.sstables; try (Refs<SSTableReader> refs = Refs.ref(readers)) { - //We have a special path for views. - //Since the view requires cleaning up any pre-existing state, we must put - //all partitions through the same write path as normal mutations. - //This also ensures any 2is are also updated - if (hasViews) + /* + * We have a special path for views and for CDC. + * + * For views, since the view requires cleaning up any pre-existing state, we must put all partitions + * through the same write path as normal mutations. This also ensures any 2is are also updated. + * + * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they + * can be archived by the CDC process on discard. + */ + if (hasViews || hasCDC) { for (SSTableReader reader : readers) { @@ -166,8 +173,17 @@ public class StreamReceiveTask extends StreamTask { try (UnfilteredRowIterator rowIterator = scanner.next()) { - //Apply unsafe (we will flush below before transaction is done) - new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe(); + Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))); + + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + if (hasCDC) + m.apply(); + else + m.applyUnsafe(); } } } @@ -218,9 +234,9 @@ public class StreamReceiveTask extends StreamTask } finally { - //We don't keep the streamed sstables since we've applied them manually - //So we abort the txn and delete the streamed sstables - if (hasViews) + // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete + // the streamed sstables. + if (hasViews || hasCDC) { if (cfs != null) cfs.forceBlockingFlush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java new file mode 100644 index 0000000..aa7898c --- /dev/null +++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.ImmutableSet; + +import static com.google.common.collect.Sets.newHashSet; + +/** + * Walks directory recursively, summing up total contents of files within. + */ +public class DirectorySizeCalculator extends SimpleFileVisitor<Path> +{ + protected final AtomicLong size = new AtomicLong(0); + protected Set<String> visited = newHashSet(); //count each file only once + protected Set<String> alive = newHashSet(); + protected final File path; + + public DirectorySizeCalculator(File path) + { + super(); + this.path = path; + rebuildFileList(); + } + + public DirectorySizeCalculator(List<File> files) + { + super(); + this.path = null; + ImmutableSet.Builder<String> builder = ImmutableSet.builder(); + for (File file : files) + builder.add(file.getName()); + alive = builder.build(); + } + + public boolean isAcceptable(Path file) + { + return true; + } + + public void rebuildFileList() + { + assert path != null; + ImmutableSet.Builder<String> builder = ImmutableSet.builder(); + for (File file : path.listFiles()) + builder.add(file.getName()); + size.set(0); + alive = builder.build(); + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException + { + if (isAcceptable(file)) + { + size.addAndGet(attrs.size()); + visited.add(file.toFile().getName()); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException + { + return FileVisitResult.CONTINUE; + } + + public long getAllocatedSize() + { + return size.get(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 7474a5e..e1a109a 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -81,7 +81,8 @@ public final class JVMStabilityInspector { logger.error("Exiting due to error while processing commit log during initialization.", t); killer.killCurrentJVM(t, true); - } else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) + } + else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die) killer.killCurrentJVM(t); else inspectThrowable(t); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java index e7b299b..3458c62 100644 --- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -226,7 +226,7 @@ public class BufferPool if (DISABLED) logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); else - logger.info("Global buffer pool is enabled, when pool is exahusted (max is {}) it will allocate {}", + logger.info("Global buffer pool is enabled, when pool is exhausted (max is {}) it will allocate {}", FBUtilities.prettyPrintMemory(MEMORY_USAGE_THRESHOLD), ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/conf/cassandra-murmur.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra-murmur.yaml b/test/conf/cassandra-murmur.yaml index 00f8b4c..a4b25ba 100644 --- a/test/conf/cassandra-murmur.yaml +++ b/test/conf/cassandra-murmur.yaml @@ -8,6 +8,8 @@ commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 commitlog_segment_size_in_mb: 5 commitlog_directory: build/test/cassandra/commitlog +cdc_raw_directory: build/test/cassandra/cdc_raw +cdc_enabled: false hints_directory: build/test/cassandra/hints partitioner: org.apache.cassandra.dht.Murmur3Partitioner listen_address: 127.0.0.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index eb03d17..cf02634 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -9,6 +9,8 @@ commitlog_sync: batch commitlog_sync_batch_window_in_ms: 1.0 commitlog_segment_size_in_mb: 5 commitlog_directory: build/test/cassandra/commitlog +cdc_raw_directory: build/test/cassandra/cdc_raw +cdc_enabled: false hints_directory: build/test/cassandra/hints partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/conf/cdc.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cdc.yaml b/test/conf/cdc.yaml new file mode 100644 index 0000000..f79930a --- /dev/null +++ b/test/conf/cdc.yaml @@ -0,0 +1 @@ +cdc_enabled: true http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/data/bloom-filter/ka/foo.cql ---------------------------------------------------------------------- diff --git a/test/data/bloom-filter/ka/foo.cql b/test/data/bloom-filter/ka/foo.cql index c4aed6a..4926e3a 100644 --- a/test/data/bloom-filter/ka/foo.cql +++ b/test/data/bloom-filter/ka/foo.cql @@ -59,6 +59,6 @@ Compression ratio: 0.4 Estimated droppable tombstones: 0.0 SSTable Level: 0 Repaired at: 0 -ReplayPosition(segmentId=1428529465658, position=6481) +CommitLogPosition(segmentId=1428529465658, position=6481) Estimated tombstone drop times:%n http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 0474b32..04682fd 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -21,41 +21,23 @@ package org.apache.cassandra.db.commitlog; * */ -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.RateLimiter; +import org.junit.*; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.*; import org.apache.cassandra.config.Config.CommitLogSync; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.*; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.SerializationHelper; -import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.security.EncryptionContext; @@ -131,7 +113,7 @@ public class CommitLogStressTest volatile boolean stop = false; boolean randomSize = false; boolean discardedRun = false; - ReplayPosition discardedPos; + CommitLogPosition discardedPos; @BeforeClass static public void initialize() throws IOException @@ -151,7 +133,7 @@ public class CommitLogStressTest } @Before - public void cleanDir() + public void cleanDir() throws IOException { File dir = new File(location); if (dir.isDirectory()) @@ -217,12 +199,22 @@ public class CommitLogStressTest { DatabaseDescriptor.setCommitLogCompression(compression); DatabaseDescriptor.setEncryptionContext(encryptionContext); - for (CommitLogSync sync : CommitLogSync.values()) + + String originalDir = DatabaseDescriptor.getCommitLogLocation(); + try + { + DatabaseDescriptor.setCommitLogLocation(location); + for (CommitLogSync sync : CommitLogSync.values()) + { + DatabaseDescriptor.setCommitLogSync(sync); + CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start(); + testLog(commitLog); + assert !failed; + } + } + finally { - DatabaseDescriptor.setCommitLogSync(sync); - CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start(); - testLog(commitLog); - assert !failed; + DatabaseDescriptor.setCommitLogLocation(originalDir); } } @@ -234,12 +226,12 @@ public class CommitLogStressTest commitLog.executor.getClass().getSimpleName(), randomSize ? " random size" : "", discardedRun ? " with discarded run" : ""); - commitLog.allocator.enableReserveSegmentCreation(); + CommitLog.instance.segmentManager.enableReserveSegmentCreation(); final List<CommitlogThread> threads = new ArrayList<>(); ScheduledExecutorService scheduled = startThreads(commitLog, threads); - discardedPos = ReplayPosition.NONE; + discardedPos = CommitLogPosition.NONE; if (discardedRun) { // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations. @@ -251,13 +243,12 @@ public class CommitLogStressTest for (CommitlogThread t: threads) { t.join(); - if (t.rp.compareTo(discardedPos) > 0) - discardedPos = t.rp; + if (t.clsp.compareTo(discardedPos) > 0) + discardedPos = t.clsp; } verifySizes(commitLog); - commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, - discardedPos); + commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos); threads.clear(); System.out.format("Discarded at %s\n", discardedPos); @@ -285,26 +276,28 @@ public class CommitLogStressTest System.out.println("Stopped. Replaying... "); System.out.flush(); - Replayer repl = new Replayer(commitLog); + Reader reader = new Reader(); File[] files = new File(location).listFiles(); - repl.recover(files); + + DummyHandler handler = new DummyHandler(); + reader.readAllFiles(handler, files); for (File f : files) if (!f.delete()) Assert.fail("Failed to delete " + f); - if (hash == repl.hash && cells == repl.cells) + if (hash == reader.hash && cells == reader.cells) System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n", commitLog.configuration.getCompressorName(), commitLog.configuration.useEncryption(), - repl.discarded, repl.skipped); + reader.discarded, reader.skipped); else { System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n", commitLog.configuration.getCompressorName(), commitLog.configuration.useEncryption(), - repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped, - repl.hash, hash); + reader.cells, cells, cells - reader.cells, reader.discarded, reader.skipped, + reader.hash, hash); failed = true; } } @@ -318,16 +311,16 @@ public class CommitLogStressTest // FIXME: The executor should give us a chance to await completion of the sync we requested. commitLog.executor.requestExtraSync().awaitUninterruptibly(); // Wait for any pending deletes or segment allocations to complete. - commitLog.allocator.awaitManagementTasksCompletion(); + CommitLog.instance.segmentManager.awaitManagementTasksCompletion(); long combinedSize = 0; - for (File f : new File(commitLog.location).listFiles()) + for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) combinedSize += f.length(); Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize()); List<String> logFileNames = commitLog.getActiveSegmentNames(); Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios(); - Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments(); + Collection<CommitLogSegment> segments = CommitLog.instance.segmentManager.getActiveSegments(); for (CommitLogSegment segment : segments) { @@ -419,7 +412,7 @@ public class CommitLogStressTest final CommitLog commitLog; final Random random; - volatile ReplayPosition rp; + volatile CommitLogPosition clsp; public CommitlogThread(CommitLog commitLog, Random rand) { @@ -448,34 +441,35 @@ public class CommitLogStressTest dataSize += sz; } - rp = commitLog.add(new Mutation(builder.build())); + Keyspace ks = Keyspace.open("Keyspace1"); + clsp = commitLog.add(new Mutation(builder.build())); counter.incrementAndGet(); } } } - class Replayer extends CommitLogReplayer + class Reader extends CommitLogReader { - Replayer(CommitLog log) - { - super(log, discardedPos, null, ReplayFilter.create()); - } - int hash; int cells; int discarded; int skipped; @Override - void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) + protected void readMutation(CommitLogReadHandler handler, + byte[] inputBuffer, + int size, + CommitLogPosition minPosition, + final int entryLocation, + final CommitLogDescriptor desc) throws IOException { - if (desc.id < discardedPos.segment) + if (desc.id < discardedPos.segmentId) { System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation); discarded++; return; } - else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position) + else if (desc.id == discardedPos.segmentId && entryLocation <= discardedPos.position) { // Skip over this mutation. skipped++; @@ -516,4 +510,13 @@ public class CommitLogStressTest } } } + + class DummyHandler implements CommitLogReadHandler + { + public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException { return false; } + + public void handleUnrecoverableError(CommitLogReadException exception) throws IOException { } + + public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java new file mode 100644 index 0000000..a653c81 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.DirectorySizeCalculator; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 1) +@Measurement(iterations = 30) +@Fork(value = 1,jvmArgsAppend = "-Xmx512M") +@Threads(1) +@State(Scope.Benchmark) +public class DirectorySizerBench +{ + private File tempDir; + private DirectorySizeCalculator sizer; + + @Setup(Level.Trial) + public void setUp() throws IOException + { + tempDir = Files.createTempDirectory(randString()).toFile(); + + // Since #'s on laptops and commodity desktops are so useful in considering enterprise virtualized server environments... + + // Spinning disk 7200rpm 1TB, win10, ntfs, i6600 skylake, 256 files: + // [java] Result: 0.581 â(99.9%) 0.003 ms/op [Average] + // [java] Statistics: (min, avg, max) = (0.577, 0.581, 0.599), stdev = 0.005 + // [java] Confidence interval (99.9%): [0.577, 0.584] + + // Same hardware, 25600 files: + // [java] Result: 56.990 â(99.9%) 0.374 ms/op [Average] + // [java] Statistics: (min, avg, max) = (56.631, 56.990, 59.829), stdev = 0.560 + // [java] Confidence interval (99.9%): [56.616, 57.364] + + // #'s on a rmbp, 2014, SSD, ubuntu 15.10, ext4, i7-4850HQ @ 2.3, 25600 samples + // [java] Result: 74.714 ±(99.9%) 0.558 ms/op [Average] + // [java] Statistics: (min, avg, max) = (73.687, 74.714, 76.872), stdev = 0.835 + // [java] Confidence interval (99.9%): [74.156, 75.272] + + // Throttle CPU on the Windows box to .87GHZ from 4.3GHZ turbo single-core, and #'s for 25600: + // [java] Result: 298.628 â(99.9%) 14.755 ms/op [Average] + // [java] Statistics: (min, avg, max) = (291.245, 298.628, 412.881), stdev = 22.085 + // [java] Confidence interval (99.9%): [283.873, 313.383] + + // Test w/25,600 files, 100x the load of a full default CommitLog (8192) divided by size (32 per) + populateRandomFiles(tempDir, 25600); + sizer = new DirectorySizeCalculator(tempDir); + } + + @TearDown + public void tearDown() + { + FileUtils.deleteRecursive(tempDir); + } + + private void populateRandomFiles(File dir, int count) throws IOException + { + for (int i = 0; i < count; i++) + { + PrintWriter pw = new PrintWriter(dir + File.separator + randString(), "UTF-8"); + pw.write(randString()); + pw.close(); + } + } + + private String randString() + { + return UUID.randomUUID().toString(); + } + + @Benchmark + public void countFiles(final Blackhole bh) throws IOException + { + sizer.rebuildFileList(); + Files.walkFileTree(tempDir.toPath(), sizer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java index 3bdb192..0047f48 100644 --- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java +++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java @@ -47,17 +47,20 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader { Config config = super.loadConfig(); + String sep = File.pathSeparator; config.rpc_port += offset; config.native_transport_port += offset; config.storage_port += offset; - config.commitlog_directory += File.pathSeparator + offset; - config.saved_caches_directory += File.pathSeparator + offset; - config.hints_directory += File.pathSeparator + offset; - for (int i = 0; i < config.data_file_directories.length; i++) - config.data_file_directories[i] += File.pathSeparator + offset; + config.commitlog_directory += sep + offset; + config.saved_caches_directory += sep + offset; + config.hints_directory += sep + offset; + + config.cdc_raw_directory += sep + offset; + for (int i = 0; i < config.data_file_directories.length; i++) + config.data_file_directories[i] += sep + offset; return config; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java index dd5444f..d4e621f 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java @@ -40,7 +40,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -248,7 +248,7 @@ public class BatchlogManagerTest if (i == 500) SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2), timestamp, - ReplayPosition.NONE); + CommitLogPosition.NONE); // Adjust the timestamp (slightly) to make the test deterministic. if (i >= 500) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java new file mode 100644 index 0000000..632c290 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import org.junit.Assert; +import org.junit.Test; + +public class CDCStatementTest extends CQLTester +{ + @Test + public void testEnableOnCreate() throws Throwable + { + createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;"); + Assert.assertTrue(currentTableMetadata().params.cdc); + } + + @Test + public void testEnableOnAlter() throws Throwable + { + createTable("CREATE TABLE %s (key text, val int, primary key(key));"); + Assert.assertFalse(currentTableMetadata().params.cdc); + execute("ALTER TABLE %s WITH cdc = true;"); + Assert.assertTrue(currentTableMetadata().params.cdc); + } + + @Test + public void testDisableOnAlter() throws Throwable + { + createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;"); + Assert.assertTrue(currentTableMetadata().params.cdc); + execute("ALTER TABLE %s WITH cdc = false;"); + Assert.assertFalse(currentTableMetadata().params.cdc); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index bca9e7b..8dac72e 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -193,6 +193,10 @@ public abstract class CQLTester FileUtils.deleteRecursive(dir); } + File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation()); + if (cdcDir.exists()) + FileUtils.deleteRecursive(cdcDir); + cleanupSavedCaches(); // clean up data directory which are stored as data directory/keyspace/data files http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java index 1527b1e..fd7afd9 100644 --- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java +++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java @@ -149,7 +149,7 @@ public class OutOfSpaceTest extends CQLTester // Make sure commit log wasn't discarded. UUID cfid = currentTableMetadata().cfId; - for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments()) + for (CommitLogSegment segment : CommitLog.instance.segmentManager.getActiveSegments()) if (segment.getDirtyCFIDs().contains(cfid)) return; fail("Expected commit log to remain dirty for the affected table."); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java index 33a41d8..8f92403 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java @@ -23,7 +23,6 @@ import java.util.UUID; import org.junit.Test; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.CQLTester; @@ -36,10 +35,10 @@ import org.apache.cassandra.triggers.ITrigger; import org.apache.cassandra.utils.ByteBufferUtil; import static java.lang.String.format; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.fail; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; public class CreateTest extends CQLTester { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e31e2162/test/unit/org/apache/cassandra/db/ReadMessageTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java index 6dafa37..4047cc9 100644 --- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java +++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java @@ -43,6 +43,8 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; public class ReadMessageTest @@ -56,6 +58,10 @@ public class ReadMessageTest @BeforeClass public static void defineSchema() throws ConfigurationException { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + CFMetaData cfForReadMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_FOR_READ_TEST) .addPartitionKey("key", BytesType.instance) .addClusteringColumn("col1", AsciiType.instance) @@ -195,7 +201,9 @@ public class ReadMessageTest Checker checker = new Checker(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit1")), cfsnocommit.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit2"))); - CommitLogTestReplayer.examineCommitLog(checker); + + CommitLogTestReplayer replayer = new CommitLogTestReplayer(checker); + replayer.examineCommitLog(); assertTrue(checker.commitLogMessageFound); assertFalse(checker.noCommitLogMessageFound);
