merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2648047a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2648047a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2648047a Branch: refs/heads/trunk Commit: 2648047a443306c8f2fc921c353cdbd54d964c5f Parents: e93578b 7161aec Author: Jonathan Ellis <[email protected]> Authored: Wed Sep 18 12:44:31 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Sep 18 12:44:31 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/cql3/CQL.textile | 2 +- .../org/apache/cassandra/db/Directories.java | 62 +++++--------------- .../db/compaction/CompactionManager.java | 2 +- .../cassandra/db/compaction/Scrubber.java | 2 +- .../cassandra/io/util/DiskAwareRunnable.java | 2 +- .../cassandra/streaming/StreamReader.java | 2 +- .../cassandra/db/ColumnFamilyStoreTest.java | 7 +-- .../apache/cassandra/db/DirectoriesTest.java | 2 +- .../compaction/LegacyLeveledManifestTest.java | 2 +- .../io/sstable/SSTableSimpleWriterTest.java | 2 +- 11 files changed, 28 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 98ea03f,fb9915e..fd8d852 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,30 -1,5 +1,31 @@@ -1.2.10 +2.0.1 + * add file_cache_size_in_mb setting (CASSANDRA-5661) + * Improve error message when yaml contains invalid properties (CASSANDRA-5958) + * Improve leveled compaction's ability to find non-overlapping L0 compactions + to work on concurrently (CASSANDRA-5921) + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614) + * Log Merkle tree stats (CASSANDRA-2698) + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862) + * Improve offheap memcpy performance (CASSANDRA-5884) + * Use a range aware scanner for cleanup (CASSANDRA-2524) + * Cleanup doesn't need to inspect sstables that contain only local data + (CASSANDRA-5722) + * Add ability for CQL3 to list partition keys (CASSANDRA-4536) + * Improve native protocol serialization (CASSANDRA-5664) + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923) + * Require superuser status for adding triggers (CASSANDRA-5963) + * Make standalone scrubber handle old and new style leveled manifest + (CASSANDRA-6005) + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023) + * Fix paged ranges with multiple replicas (CASSANDRA-6004) + * Fix potential AssertionError during tracing (CASSANDRA-6041) + * Fix NPE in sstablesplit (CASSANDRA-6027) + * Migrate pre-2.0 key/value/column aliases to system.schema_columns + (CASSANDRA-6009) + * Paging filter empty rows too agressively (CASSANDRA-6040) + * Support variadic parameters for IN clauses (CASSANDRA-4210) +Merged from 1.2: + * Avoid second-guessing out-of-space state (CASSANDRA-5605) * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982) * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002) * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/doc/cql3/CQL.textile ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Directories.java index 52f9699,351c0c0..18fc639 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@@ -19,9 -19,9 +19,10 @@@ package org.apache.cassandra.db import java.io.File; import java.io.FileFilter; + import java.io.IOError; import java.io.IOException; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@@ -149,8 -147,15 +150,8 @@@ public class Directorie // retry after GCing has forced unmap of compacted SSTables so they can be deleted // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far SSTableDeletingTask.rescheduleFailedTasks(); - try - { - Thread.sleep(10000); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); - path = getLocationWithMaximumAvailableSpace(estimatedSize); + path = getWriteableLocationAsFile(); } return path; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 862f5a2,0000000..d72cb5e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -1,137 -1,0 +1,137 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Collection; +import java.util.UUID; + +import com.google.common.base.Throwables; +import com.ning.compress.lzf.LZFInputStream; + +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.messages.FileMessageHeader; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.BytesReadTracker; +import org.apache.cassandra.utils.Pair; + +/** + * StreamReader reads from stream and writes to SSTable. + */ +public class StreamReader +{ + protected final UUID cfId; + protected final long estimatedKeys; + protected final Collection<Pair<Long, Long>> sections; + protected final StreamSession session; + protected final Descriptor.Version inputVersion; + + protected Descriptor desc; + + public StreamReader(FileMessageHeader header, StreamSession session) + { + this.session = session; + this.cfId = header.cfId; + this.estimatedKeys = header.estimatedKeys; + this.sections = header.sections; + this.inputVersion = new Descriptor.Version(header.version); + } + + /** + * @param channel where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + public SSTableReader read(ReadableByteChannel channel) throws IOException + { + long totalSize = totalSize(); + + Pair<String, String> kscf = Schema.instance.getCF(cfId); + ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + SSTableWriter writer = createWriter(cfs, totalSize); + DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel))); + BytesReadTracker in = new BytesReadTracker(dis); + try + { + while (in.getBytesRead() < totalSize) + { + writeRow(writer, in, cfs); + // TODO move this to BytesReadTracker + session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + } + return writer.closeAndOpenReader(); + } + catch (Throwable e) + { + writer.abort(); + drain(dis, in.getBytesRead()); + if (e instanceof IOException) + throw (IOException) e; + else + throw Throwables.propagate(e); + } + } + + protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException + { - Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize); ++ Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(); + if (localDir == null) + throw new IOException("Insufficient disk space to store " + totalSize + " bytes"); + desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir))); + + return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys); + } + + protected void drain(InputStream dis, long bytesRead) throws IOException + { + long toSkip = totalSize() - bytesRead; + toSkip = toSkip - dis.skip(toSkip); + while (toSkip > 0) + toSkip = toSkip - dis.skip(toSkip); + } + + protected long totalSize() + { + long size = 0; + for (Pair<Long, Long> section : sections) + size += section.right - section.left; + return size; + } + + protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException + { + DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + writer.appendFromStream(key, cfs.metadata, in, inputVersion); + cfs.invalidateCachedRow(key); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index f21b60e,abe3f05..0a78b2a --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@@ -1334,66 -1240,12 +1334,65 @@@ public class ColumnFamilyStoreTest exte testMultiRangeSlicesBehavior(prepareMultiRangeSlicesTest(10, false)); } + @Test + public void testRemoveUnifinishedCompactionLeftovers() throws Throwable + { + String ks = "Keyspace1"; + String cf = "Standard3"; // should be empty + + final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); + Directories dir = Directories.create(ks, cf); + ByteBuffer key = bytes("key"); + + // 1st sstable - SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), - cfmeta, StorageService.getPartitioner()); ++ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner()); + writer.newRow(key); + writer.addColumn(bytes("col"), bytes("val"), 1); + writer.close(); + + Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list(); + assert sstables.size() == 1; + + Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next(); + final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey()); + + // simulate incomplete compaction - writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(100), ++ writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), + cfmeta, StorageService.getPartitioner()) + { + protected SSTableWriter getWriter() + { + SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator); + collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable + return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName), + 0, + metadata, + StorageService.getPartitioner(), + collector); + } + }; + writer.newRow(key); + writer.addColumn(bytes("col"), bytes("val"), 1); + writer.close(); + + // should have 2 sstables now + sstables = dir.sstableLister().list(); + assert sstables.size() == 2; + + ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation)); + + // 2nd sstable should be removed (only 1st sstable exists in set of size 1) + sstables = dir.sstableLister().list(); + assert sstables.size() == 1; + assert sstables.containsKey(sstable1.descriptor); + } + private ColumnFamilyStore prepareMultiRangeSlicesTest(int valueSize, boolean flush) throws Throwable { - String tableName = "Keyspace1"; + String keyspaceName = "Keyspace1"; String cfName = "Standard1"; - Table table = Table.open(tableName); - ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName); + Keyspace keyspace = Keyspace.open(keyspaceName); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName); cfs.clearUnsafe(); String[] letters = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l" }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java index d8e8af0,0000000..7fd6c10 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java @@@ -1,117 -1,0 +1,117 @@@ +package org.apache.cassandra.db.compaction; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMetadata; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Pair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.db.compaction.LegacyLeveledManifestTestHelper.*; + +public class LegacyLeveledManifestTest +{ + private final static String LEGACY_VERSION = "ic"; + + private File destDir; + @Before + public void setup() + { - destDir = Directories.create(KS, CF).getDirectoryForNewSSTables(0); ++ destDir = Directories.create(KS, CF).getDirectoryForNewSSTables(); + FileUtils.createDirectory(destDir); + for (File srcFile : getLegacySSTableDir(LEGACY_VERSION).listFiles()) + { + File destFile = new File(destDir, srcFile.getName()); + FileUtils.createHardLink(srcFile,destFile); + assert destFile.exists() : destFile.getAbsoluteFile(); + } + } + @After + public void tearDown() + { + FileUtils.deleteRecursive(destDir); + } + + @Test + public void migrateTest() throws IOException + { + assertTrue(LegacyLeveledManifest.manifestNeedsMigration(KS, CF)); + } + + @Test + public void doMigrationTest() throws IOException, InterruptedException + { + LegacyLeveledManifest.migrateManifests(KS, CF); + + for (int i = 0; i <= 2; i++) + { + Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db"); + SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor).left; + assertEquals(metadata.sstableLevel, i); + } + } + + /** + * Validate that the rewritten stats file is the same as the original one. + * @throws IOException + */ + @Test + public void validateSSTableMetadataTest() throws IOException + { + Map<Descriptor, Pair<SSTableMetadata, Set<Integer>>> beforeMigration = new HashMap<>(); + for (int i = 0; i <= 2; i++) + { + Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+KS+"-"+CF+"-"+LEGACY_VERSION+"-"+i+"-Statistics.db"); + beforeMigration.put(descriptor, SSTableMetadata.serializer.deserialize(descriptor, false)); + } + + LegacyLeveledManifest.migrateManifests(KS, CF); + + for (Map.Entry<Descriptor, Pair<SSTableMetadata, Set<Integer>>> entry : beforeMigration.entrySet()) + { + Pair<SSTableMetadata, Set<Integer>> newMetaPair = SSTableMetadata.serializer.deserialize(entry.getKey()); + SSTableMetadata newMetadata = newMetaPair.left; + SSTableMetadata oldMetadata = entry.getValue().left; + assertEquals(newMetadata.estimatedRowSize, oldMetadata.estimatedRowSize); + assertEquals(newMetadata.estimatedColumnCount, oldMetadata.estimatedColumnCount); + assertEquals(newMetadata.replayPosition, oldMetadata.replayPosition); + assertEquals(newMetadata.minTimestamp, oldMetadata.minTimestamp); + assertEquals(newMetadata.maxTimestamp, oldMetadata.maxTimestamp); + assertEquals(newMetadata.compressionRatio, oldMetadata.compressionRatio, 0.01); + assertEquals(newMetadata.partitioner, oldMetadata.partitioner); + assertEquals(newMetadata.estimatedTombstoneDropTime, oldMetadata.estimatedTombstoneDropTime); + assertEquals(entry.getValue().right, newMetaPair.right); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2648047a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java index e80d2bb,ce569b9..9e7aa16 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableSimpleWriterTest.java @@@ -39,11 -39,11 +39,11 @@@ public class SSTableSimpleWriterTest ex final int INC = 5; final int NBCOL = 10; - String tablename = "Keyspace1"; + String keyspaceName = "Keyspace1"; String cfname = "StandardInteger1"; - Table t = Table.open(tablename); // make sure we create the directory - File dir = Directories.create(tablename, cfname).getDirectoryForNewSSTables(); + Keyspace t = Keyspace.open(keyspaceName); // make sure we create the directory - File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables(0); ++ File dir = Directories.create(keyspaceName, cfname).getDirectoryForNewSSTables(); assert dir.exists(); IPartitioner partitioner = StorageService.getPartitioner();
