Modified: cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Feb 23 19:32:42 2011 @@ -58,7 +58,7 @@ public class PrecompactedRow extends Abs this.headerBuffer = new DataOutputBuffer(); } - public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore) + public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize) { super(rows.get(0).getKey()); buffer = new DataOutputBuffer(); @@ -71,7 +71,7 @@ public class PrecompactedRow extends Abs } boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, sstables); - if (rows.size() > 1 || shouldPurge) + if (rows.size() > 1 || shouldPurge || !rows.get(0).sstable.descriptor.isLatestVersion || forceDeserialize) { ColumnFamily cf = null; for (SSTableIdentityIterator row : rows)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Wed Feb 23 19:32:42 2011 @@ -89,8 +89,10 @@ public class CacheWriter<K, V> implement { out.close(); } + + path.delete(); // ignore error if it didn't exist if (!tmpFile.renameTo(path)) - throw new IOException("Unable to rename cache to " + path); + throw new IOException("Unable to rename " + tmpFile + " to " + path); logger.info(String.format("Saved %s (%d items) in %d ms", path.getName(), keys.size(), (System.currentTimeMillis() - start))); } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Wed Feb 23 19:32:42 2011 @@ -111,8 +111,7 @@ public class IndexHelper } /** - * the index of the IndexInfo in which @name will be found. - * If the index is @indexList.size(), the @name appears nowhere. + * The index of the IndexInfo in which a scan starting with @name should begin. * * @param name * name of the index @@ -133,19 +132,40 @@ public class IndexHelper if (name.remaining() == 0 && reversed) return indexList.size() - 1; IndexInfo target = new IndexInfo(name, name, 0, 0); - int index = Collections.binarySearch(indexList, target, getComparator(comparator)); - return index < 0 ? -1 * (index + 1) : index; + /* + Take the example from the unit test, and say your index looks like this: + [0..5][10..15][20..25] + and you look for the slice [13..17]. + + When doing forward slice, we we doing a binary search comparing 13 (the start of the query) + to the lastName part of the index slot. You'll end up with the "first" slot, going from left to right, + that may contain the start. + + When doing a reverse slice, we do the same thing, only using as a start column the end of the query, + i.e. 17 in this example, compared to the firstName part of the index slots. bsearch will give us the + first slot where firstName > start ([20..25] here), so we subtract an extra one to get the slot just before. + */ + int index = Collections.binarySearch(indexList, target, getComparator(comparator, reversed)); + return index < 0 ? -index - (reversed ? 2 : 1) : index; } - public static Comparator<IndexInfo> getComparator(final AbstractType nameComparator) + public static Comparator<IndexInfo> getComparator(final AbstractType nameComparator, boolean reversed) { - return new Comparator<IndexInfo>() - { - public int compare(IndexInfo o1, IndexInfo o2) - { - return nameComparator.compare(o1.lastName, o2.lastName); - } - }; + return reversed + ? new Comparator<IndexInfo>() + { + public int compare(IndexInfo o1, IndexInfo o2) + { + return nameComparator.compare(o1.firstName, o2.firstName); + } + } + : new Comparator<IndexInfo>() + { + public int compare(IndexInfo o1, IndexInfo o2) + { + return nameComparator.compare(o1.lastName, o2.lastName); + } + }; } public static class IndexInfo Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Wed Feb 23 19:32:42 2011 @@ -259,7 +259,7 @@ public abstract class SSTable @Override public String toString() { - return getClass().getName() + "(" + + return getClass().getSimpleName() + "(" + "path='" + getFilename() + '\'' + ')'; } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Feb 23 19:32:42 2011 @@ -24,15 +24,22 @@ package org.apache.cassandra.io.sstable; import java.io.DataOutput; import java.io.IOError; import java.io.IOException; +import java.util.ArrayList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.columniterator.IColumnIterator; import org.apache.cassandra.io.util.BufferedRandomAccessFile; +import org.apache.cassandra.utils.Filter; public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator { + private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class); + private final DecoratedKey key; private final long finishedAt; private final BufferedRandomAccessFile file; @@ -56,6 +63,12 @@ public class SSTableIdentityIterator imp public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize) throws IOException { + this(sstable, file, key, dataStart, dataSize, false); + } + + public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean deserializeRowHeader) + throws IOException + { this.sstable = sstable; this.file = file; this.key = key; @@ -66,6 +79,28 @@ public class SSTableIdentityIterator imp try { file.seek(this.dataStart); + if (deserializeRowHeader) + { + try + { + IndexHelper.defreezeBloomFilter(file, sstable.descriptor.usesOldBloomFilter); + } + catch (Exception e) + { + logger.info("Invalid bloom filter in " + sstable + "; will rebuild it"); + // deFreeze should have left the file position ready to deserialize index + } + try + { + IndexHelper.deserializeIndex(file); + } + catch (Exception e) + { + logger.info("Invalid row summary in " + sstable + "; will rebuild it"); + } + file.seek(this.dataStart); + } + IndexHelper.skipBloomFilter(file); IndexHelper.skipIndex(file); columnFamily = sstable.createColumnFamily(); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Feb 23 19:32:42 2011 @@ -97,7 +97,7 @@ public class SSTableReader extends SSTab * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created * later than maxDataAge. - * + * * The field is not serialized to disk, so relying on it for more than what truncate does is not advised. * * When a new sstable is flushed, maxDataAge is set to the time of creation. Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Wed Feb 23 19:32:42 2011 @@ -186,7 +186,7 @@ public class SSTableScanner implements I } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException(SSTableScanner.this + " failed to provide next columns from " + this, e); } } @@ -194,5 +194,21 @@ public class SSTableScanner implements I { throw new UnsupportedOperationException(); } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + + "finishedAt:" + finishedAt + + ")"; + } +} + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + + "file=" + file + + " sstable=" + sstable + + " exhausted=" + exhausted + + ")"; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Wed Feb 23 19:32:42 2011 @@ -55,7 +55,7 @@ public class BufferedRandomAccessFile ex // `current` as current position in file // `bufferOffset` is the offset of the beginning of the buffer - // `bufferEnd` is `bufferOffset` + count of bytes read from file + // `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer private long bufferOffset, bufferEnd, current = 0; // max buffer size is set according to (int size) parameter in the @@ -196,7 +196,7 @@ public class BufferedRandomAccessFile ex buffer.clear(); bufferOffset = current; - if (bufferOffset > channel.size()) + if (bufferOffset >= channel.size()) { buffer.rewind(); bufferEnd = bufferOffset; @@ -259,9 +259,8 @@ public class BufferedRandomAccessFile ex } @Override - // -1 will be returned if EOF is reached, RandomAccessFile is responsible - // for - // throwing EOFException + // -1 will be returned if EOF is reached; higher-level methods like readInt + // or readFully (from RandomAccessFile) will throw EOFException but this should not public int read(byte[] buff, int offset, int length) throws IOException { int bytesCount = 0; @@ -282,7 +281,7 @@ public class BufferedRandomAccessFile ex private int readAtMost(byte[] buff, int offset, int length) throws IOException { - if (length >= bufferEnd && hitEOF) + if (length > bufferEnd && hitEOF) return -1; final int left = (int) maxBufferSize - buffer.position(); @@ -467,6 +466,11 @@ public class BufferedRandomAccessFile ex return (int) bytes; } + public static BufferedRandomAccessFile getUncachingReader(String filename) throws IOException + { + return new BufferedRandomAccessFile(new File(filename), "r", 8 * 1024 * 1024, true); + } + /** * Class to hold a mark to the position of the file */ @@ -479,4 +483,12 @@ public class BufferedRandomAccessFile ex this.pointer = pointer; } } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + + "filePath='" + filePath + "'" + + ", length=" + fileLength + + ", skipCache=" + skipCache + ")"; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Wed Feb 23 19:32:42 2011 @@ -157,4 +157,12 @@ public class MappedFileDataInput extends this.position = position; } } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + + "filename='" + filename + "'" + + ", position=" + position + + ")"; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java Wed Feb 23 19:32:42 2011 @@ -141,4 +141,11 @@ public abstract class SegmentedFile public void remove() { throw new UnsupportedOperationException(); } } + + @Override + public String toString() { + return getClass().getSimpleName() + "(path='" + path + "'" + + ", length=" + length + + ")"; +} } Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Feb 23 19:32:42 2011 @@ -25,7 +25,6 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; -import java.security.MessageDigest; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -153,21 +152,6 @@ public final class MessagingService impl subscriber.receiveTiming(address, latency); } - public static byte[] hash(String type, byte data[]) - { - byte result[]; - try - { - MessageDigest messageDigest = MessageDigest.getInstance(type); - result = messageDigest.digest(data); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - return result; - } - /** called from gossiper when it notices a node is not responding. */ public void convict(InetAddress ep) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Feb 23 19:32:42 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.service; import java.io.*; import java.net.InetAddress; import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -377,15 +376,7 @@ public class AntiEntropyService { validated++; // MerkleTree uses XOR internally, so we want lots of output bits here - MessageDigest digest = null; - try - { - digest = MessageDigest.getInstance("SHA-256"); - } - catch (NoSuchAlgorithmException e) - { - throw new AssertionError(e); - } + MessageDigest digest = FBUtilities.newMessageDigest("SHA-256"); row.update(digest); return new MerkleTree.RowHash(row.key.token, digest.digest()); } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Wed Feb 23 19:32:42 2011 @@ -73,7 +73,10 @@ public class MigrationManager implements public void onRemove(InetAddress endpoint) { } - /** will either push or pull an updating depending on who is behind. */ + /** + * will either push or pull an updating depending on who is behind. + * fat clients should never push their schemas (since they have no local storage). + */ public static void rectify(UUID theirVersion, InetAddress endpoint) { UUID myVersion = DatabaseDescriptor.getDefsVersion(); @@ -84,7 +87,7 @@ public class MigrationManager implements logger.debug("My data definitions are old. Asking for updates since {}", myVersion.toString()); announce(myVersion, Collections.singleton(endpoint)); } - else + else if (!StorageService.instance.isClientMode()) { logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString()); pushMigrations(theirVersion, myVersion, endpoint); @@ -119,8 +122,7 @@ public class MigrationManager implements public static void passiveAnnounce(UUID version) { // this is for notifying nodes as they arrive in the cluster. - if (!StorageService.instance.isClientMode()) - Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version)); + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version)); logger.debug("Announcing my schema is " + version); } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb 23 19:32:42 2011 @@ -74,7 +74,7 @@ public class StorageProxy implements Sto private static final LatencyTracker counterWriteStats = new LatencyTracker(); private static boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled(); private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); - private static final String UNREACHABLE = "UNREACHABLE"; + public static final String UNREACHABLE = "UNREACHABLE"; private static final WritePerformer standardWritePerformer; private static final WritePerformer counterWritePerformer; @@ -796,17 +796,18 @@ public class StorageProxy implements Sto } hosts.add(host.getHostAddress()); } + + // we're done: the results map is ready to return to the client. the rest is just debug logging: if (results.get(UNREACHABLE) != null) logger.debug("Hosts not in agreement. Didn't get a response from everybody: " + StringUtils.join(results.get(UNREACHABLE), ",")); - // check for version disagreement. log the hosts that don't agree. for (Map.Entry<String, List<String>> entry : results.entrySet()) { + // check for version disagreement. log the hosts that don't agree. if (entry.getKey().equals(UNREACHABLE) || entry.getKey().equals(myVersion)) continue; for (String host : entry.getValue()) logger.debug("%s disagrees (%s)", host, entry.getKey()); } - if (results.size() == 1) logger.debug("Schemas are in agreement."); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Feb 23 19:32:42 2011 @@ -1239,13 +1239,19 @@ public class StorageService implements I { if (tableName.equals("system")) throw new RuntimeException("Cleanup of the system table is neither necessary nor wise"); - + for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) { cfStore.forceCleanup(); } } + public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + cfStore.scrub(); + } + public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Feb 23 19:32:42 2011 @@ -159,6 +159,14 @@ public interface StorageServiceMBean public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** + * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace. + * If columnFamilies array is empty, all CFs are scrubbed. + * + * Scrubbed CFs will be snapshotted first. + */ + public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + + /** * Flush all memtables for the given column families, or all columnfamilies for the given table * if none are explicitly listed. * @param tableName Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Feb 23 19:32:42 2011 @@ -31,6 +31,8 @@ import java.util.concurrent.TimeoutExcep import java.util.zip.DataFormatException; import java.util.zip.Inflater; +import com.google.common.base.Predicates; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -712,11 +714,13 @@ public class CassandraServer implements } } - public String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException + public synchronized String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException { logger.debug("add_column_family"); state().hasColumnFamilyListAccess(Permission.WRITE); ThriftValidation.validateCfDef(cf_def); + validateSchemaAgreement(); + try { applyMigrationOnStage(new AddColumnFamily(convertToCFMetaData(cf_def))); @@ -736,10 +740,11 @@ public class CassandraServer implements } } - public String system_drop_column_family(String column_family) throws InvalidRequestException, TException + public synchronized String system_drop_column_family(String column_family) throws InvalidRequestException, TException { logger.debug("drop_column_family"); state().hasColumnFamilyListAccess(Permission.WRITE); + validateSchemaAgreement(); try { @@ -760,10 +765,11 @@ public class CassandraServer implements } } - public String system_add_keyspace(KsDef ks_def) throws InvalidRequestException, TException + public synchronized String system_add_keyspace(KsDef ks_def) throws InvalidRequestException, TException { logger.debug("add_keyspace"); state().hasKeyspaceListAccess(Permission.WRITE); + validateSchemaAgreement(); // generate a meaningful error if the user setup keyspace and/or column definition incorrectly for (CfDef cf : ks_def.cf_defs) @@ -805,10 +811,11 @@ public class CassandraServer implements } } - public String system_drop_keyspace(String keyspace) throws InvalidRequestException, TException + public synchronized String system_drop_keyspace(String keyspace) throws InvalidRequestException, TException { logger.debug("drop_keyspace"); state().hasKeyspaceListAccess(Permission.WRITE); + validateSchemaAgreement(); try { @@ -830,15 +837,15 @@ public class CassandraServer implements } /** update an existing keyspace, but do not allow column family modifications. */ - public String system_update_keyspace(KsDef ks_def) throws InvalidRequestException, TException + public synchronized String system_update_keyspace(KsDef ks_def) throws InvalidRequestException, TException { logger.debug("update_keyspace"); state().hasKeyspaceListAccess(Permission.WRITE); - ThriftValidation.validateTable(ks_def.name); if (ks_def.getCf_defs() != null && ks_def.getCf_defs().size() > 0) throw new InvalidRequestException("Keyspace update must not contain any column family definitions."); - + validateSchemaAgreement(); + try { KSMetaData ksm = new KSMetaData( @@ -863,18 +870,17 @@ public class CassandraServer implements } } - public String system_update_column_family(CfDef cf_def) throws InvalidRequestException, TException + public synchronized String system_update_column_family(CfDef cf_def) throws InvalidRequestException, TException { logger.debug("update_column_family"); state().hasColumnFamilyListAccess(Permission.WRITE); - if (cf_def.keyspace == null || cf_def.name == null) throw new InvalidRequestException("Keyspace and CF name must be set."); - CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace, cf_def.name)); if (oldCfm == null) throw new InvalidRequestException("Could not find column family definition to modify."); - + validateSchemaAgreement(); + try { // ideally, apply() would happen on the stage with the @@ -897,6 +903,15 @@ public class CassandraServer implements } } + private void validateSchemaAgreement() throws InvalidRequestException + { + // unreachable hosts don't count towards disagreement + Map<String, List<String>> versions = Maps.filterKeys(StorageProxy.describeSchemaVersions(), + Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); + if (versions.size() > 1) + throw new InvalidRequestException("Cluster schema does not yet agree"); + } + // @see CFMetaData.applyImplicitDefaults(). private CFMetaData convertToCFMetaData(CfDef cf_def) throws InvalidRequestException, ConfigurationException { Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Feb 23 19:32:42 2011 @@ -72,7 +72,7 @@ public class NodeCmd { public enum NodeCommand { RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH, DRAIN, - DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, + DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB, SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS, COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE, DISABLETHRIFT, ENABLETHRIFT @@ -114,6 +114,7 @@ public class NodeCmd { addCmdHelp(header, "repair [keyspace] [cfnames]", "Repair one or more column family"); addCmdHelp(header, "cleanup [keyspace] [cfnames]", "Run cleanup on one or more column family"); addCmdHelp(header, "compact [keyspace] [cfnames]", "Force a (major) compaction on one or more column family"); + addCmdHelp(header, "scrub [keyspace] [cfnames]", "Scrub (rebuild sstables for) one or more column family"); addCmdHelp(header, "invalidatekeycache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family"); addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family"); addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", "Print min and max compaction thresholds for a given column family"); @@ -574,6 +575,7 @@ public class NodeCmd { case COMPACT : case REPAIR : case FLUSH : + case SCRUB : case INVALIDATEKEYCACHE : case INVALIDATEROWCACHE : optionalKSandCFs(nc, arguments, probe); @@ -628,43 +630,22 @@ public class NodeCmd { private static void optionalKSandCFs(NodeCommand nc, String[] cmdArgs, NodeProbe probe) throws InterruptedException, IOException { - // Per-keyspace - if (cmdArgs.length == 1) + // cmdArgs[0] is "scrub" + // if there is one additional arg, it's the keyspace; more are columnfamilies + List<String> keyspaces = cmdArgs.length == 1 ? probe.getKeyspaces() : Arrays.asList(cmdArgs[1]); + for (String keyspace : keyspaces) { - for (String keyspace : probe.getKeyspaces()) + if (!probe.getKeyspaces().contains(keyspace)) { - switch (nc) - { - case REPAIR : probe.forceTableRepair(keyspace); break; - case INVALIDATEKEYCACHE : probe.invalidateKeyCaches(keyspace); break; - case INVALIDATEROWCACHE : probe.invalidateRowCaches(keyspace); break; - case FLUSH : - try { probe.forceTableFlush(keyspace); } - catch (ExecutionException ee) { err(ee, "Error occured while flushing keyspace " + keyspace); } - break; - case COMPACT : - try { probe.forceTableCompaction(keyspace); } - catch (ExecutionException ee) { err(ee, "Error occured while compacting keyspace " + keyspace); } - break; - case CLEANUP : - if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs. - try { probe.forceTableCleanup(keyspace); } - catch (ExecutionException ee) { err(ee, "Error occured while cleaning up keyspace " + keyspace); } - break; - default: - throw new RuntimeException("Unreachable code."); - } + System.err.println("Keyspace [" + keyspace + "] does not exist."); + System.exit(1); } } - // Per-cf (or listed cfs) in given keyspace - else + + // second loop so we're less likely to die halfway through due to invalid keyspace + for (String keyspace : keyspaces) { - String keyspace = cmdArgs[1]; - String[] columnFamilies = new String[cmdArgs.length - 2]; - for (int i = 0; i < columnFamilies.length; i++) - { - columnFamilies[i] = cmdArgs[i + 2]; - } + String[] columnFamilies = cmdArgs.length <= 2 ? new String[0] : Arrays.copyOfRange(cmdArgs, 2, cmdArgs.length); switch (nc) { case REPAIR : probe.forceTableRepair(keyspace, columnFamilies); break; @@ -679,9 +660,14 @@ public class NodeCmd { catch (ExecutionException ee) { err(ee, "Error occured during compaction"); } break; case CLEANUP : + if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs. try { probe.forceTableCleanup(keyspace, columnFamilies); } catch (ExecutionException ee) { err(ee, "Error occured during cleanup"); } break; + case SCRUB : + try { probe.scrub(keyspace, columnFamilies); } + catch (ExecutionException ee) { err(ee, "Error occured while scrubbing keyspace " + keyspace); } + break; default: throw new RuntimeException("Unreachable code."); } Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Feb 23 19:32:42 2011 @@ -169,6 +169,11 @@ public class NodeProbe ssProxy.forceTableCleanup(tableName, columnFamilies); } + public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + ssProxy.scrub(tableName, columnFamilies); + } + public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { ssProxy.forceTableCompaction(tableName, columnFamilies); Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Wed Feb 23 19:32:42 2011 @@ -206,6 +206,8 @@ public class SSTableExport if (columnCount < PAGE_SIZE) break; + + out.print(","); } out.print(isSuperCF ? "}" : "]"); Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Wed Feb 23 19:32:42 2011 @@ -129,25 +129,19 @@ public class ByteBufferUtil */ public static byte[] getArray(ByteBuffer buffer) { - return getArray(buffer, buffer.position(), buffer.remaining()); - } + int length = buffer.remaining(); - public static byte[] getArray(ByteBuffer b, int start, int length) - { - if (b.hasArray()) + if (buffer.hasArray()) { - if (b.arrayOffset() == 0 && start == 0 && length == b.array().length) - return b.array(); + int start = buffer.position(); + if (buffer.arrayOffset() == 0 && start == 0 && length == buffer.array().length) + return buffer.array(); else - return Arrays.copyOfRange(b.array(), start + b.arrayOffset(), start + length + b.arrayOffset()); + return Arrays.copyOfRange(buffer.array(), start + buffer.arrayOffset(), start + length + buffer.arrayOffset()); } - + // else, DirectByteBuffer.get() is the fastest route byte[] bytes = new byte[length]; - - for (int i = 0; i < length; i++) - { - bytes[i] = b.get(start++); - } + buffer.duplicate().get(bytes); return bytes; } @@ -157,9 +151,9 @@ public class ByteBufferUtil * * @param buffer the array to traverse for looking for the object, may be <code>null</code> * @param valueToFind the value to find - * @param startIndex the start index to travers backwards from - * @return the last index of the value within the array, relative to buffer's arrayOffset - * [that is, between buffer.position() and buffer.limit()]; <code>-1</code> if not found. + * @param startIndex the start index (i.e. BB position) to travers backwards from + * @return the last index (i.e. BB position) of the value within the array + * [between buffer.position() and buffer.limit()]; <code>-1</code> if not found. */ public static int lastIndexOf(ByteBuffer buffer, byte valueToFind, int startIndex) { @@ -210,8 +204,7 @@ public class ByteBufferUtil } else { - for (int i = o.position(); i < o.limit(); i++) - clone.put(o.get(i)); + clone.put(o.duplicate()); clone.flip(); } @@ -221,16 +214,9 @@ public class ByteBufferUtil public static void arrayCopy(ByteBuffer buffer, int position, byte[] bytes, int offset, int length) { if (buffer.hasArray()) - { System.arraycopy(buffer.array(), buffer.arrayOffset() + position, bytes, offset, length); - } else - { - for (int i = 0; i < length; i++) - { - bytes[offset++] = buffer.get(position++); - } - } + ((ByteBuffer) buffer.duplicate().position(position)).get(bytes, offset, length); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/CLibrary.java Wed Feb 23 19:32:42 2011 @@ -157,7 +157,7 @@ public final class CLibrary } } - private static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException + public static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException { String osname = System.getProperty("os.name"); ProcessBuilder pb; Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Feb 23 19:32:42 2011 @@ -26,7 +26,6 @@ import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; @@ -35,12 +34,8 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Charsets; import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.primitives.Ints; import org.apache.commons.collections.iterators.CollatingIterator; -import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,24 +60,42 @@ public class FBUtilities private static volatile InetAddress localInetAddress_; - private static final ThreadLocal<MessageDigest> localMessageDigest = new ThreadLocal<MessageDigest>() + private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>() { @Override protected MessageDigest initialValue() { - try - { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) - { - throw new AssertionError(e); - } + return newMessageDigest("MD5"); + } + + @Override + public MessageDigest get() + { + MessageDigest digest = super.get(); + digest.reset(); + return digest; } }; public static final int MAX_UNSIGNED_SHORT = 0xFFFF; + public static MessageDigest threadLocalMD5Digest() + { + return localMD5Digest.get(); + } + + public static MessageDigest newMessageDigest(String algorithm) + { + try + { + return MessageDigest.getInstance(algorithm); + } + catch (NoSuchAlgorithmException nsae) + { + throw new RuntimeException("the requested digest algorithm (" + algorithm + ") is not available", nsae); + } + } + /** * Parses a string representing either a fraction, absolute value or percentage. */ @@ -273,24 +286,14 @@ public class FBUtilities public static byte[] hash(ByteBuffer... data) { - byte[] result; - try + MessageDigest messageDigest = localMD5Digest.get(); + for(ByteBuffer block : data) { - MessageDigest messageDigest = localMessageDigest.get(); - messageDigest.reset(); - for(ByteBuffer block : data) - { - messageDigest.update(ByteBufferUtil.clone(block)); - } + messageDigest.update(ByteBufferUtil.clone(block)); + } - result = messageDigest.digest(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - return result; - } + return messageDigest.digest(); + } public static byte[] hexToBytes(String str) { Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/GuidGenerator.java Wed Feb 23 19:32:42 2011 @@ -22,7 +22,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.Random; @@ -34,21 +33,6 @@ public class GuidGenerator { private static Random myRand; private static SecureRandom mySecureRand; private static String s_id; - private static final ThreadLocal<MessageDigest> localMessageDigest = new ThreadLocal<MessageDigest>() - { - @Override - protected MessageDigest initialValue() - { - try - { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) - { - throw new AssertionError(e); - } - } - }; static { if (System.getProperty("java.security.egd") == null) { @@ -104,8 +88,7 @@ public class GuidGenerator { .append(Long.toString(rand)); String valueBeforeMD5 = sbValueBeforeMD5.toString(); - localMessageDigest.get().reset(); - return ByteBuffer.wrap(localMessageDigest.get().digest(valueBeforeMD5.getBytes())); + return ByteBuffer.wrap(FBUtilities.threadLocalMD5Digest().digest(valueBeforeMD5.getBytes())); } /* Modified: cassandra/trunk/test/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/test/conf/cassandra.yaml (original) +++ cassandra/trunk/test/conf/cassandra.yaml Wed Feb 23 19:32:42 2011 @@ -98,6 +98,11 @@ keyspaces: column_type: Super default_validation_class: CounterColumnType + - name: Super5 + column_type: Super + rows_cached: 0 + keys_cached: 0 + - name: Indexed1 column_metadata: - name: birthdate @@ -207,3 +212,12 @@ keyspaces: - name: Standard2 keys_cached: 1.0 + + - name: RowCacheSpace + replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy + replication_factor: 1 + column_families: + - name: CachedCF + rows_cached: 100 + - name: CFWithoutCache + rows_cached: 0 Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Wed Feb 23 19:32:42 2011 @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -176,7 +177,7 @@ public class CliTest extends CleanupHelp assertEquals(errStream.toString() + " processing " + statement, "", errStream.toString()); if (statement.startsWith("drop ") || statement.startsWith("create ") || statement.startsWith("update ")) { - assertTrue(result.matches("(.{8})-(.{4})-(.{4})-(.{4})-(.{12})\n")); + assert Pattern.compile("(.{8})-(.{4})-(.{4})-(.{4})-(.{12}).*", Pattern.DOTALL).matcher(result).matches() : result; } else if (statement.startsWith("set ")) { Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Feb 23 19:32:42 2011 @@ -290,7 +290,7 @@ public class LazilyCompactedRowTest exte @Override protected AbstractCompactedRow getCompactedRow() { - return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE); + return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true); } } @@ -307,7 +307,7 @@ public class LazilyCompactedRowTest exte @Override protected AbstractCompactedRow getCompactedRow() { - return new PrecompactedRow(cfStore, rows, true, Integer.MAX_VALUE); + return new PrecompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true); } } } Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1073896&r1=1073895&r2=1073896&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java Wed Feb 23 19:32:42 2011 @@ -163,4 +163,27 @@ public class BufferedRandomAccessFileTes // Expect this call to fail -- the distance from mark to current file pointer > 2gb. bpm = rw.bytesPastMark(mark); } + + @Test + public void testRead() throws IOException + { + File tmpFile = File.createTempFile("readtest", "bin"); + tmpFile.deleteOnExit(); + + BufferedRandomAccessFile rw = new BufferedRandomAccessFile(tmpFile.getPath(), "rw"); + rw.write(new byte[]{ 1 }); + + rw.seek(0); + // test read of buffered-but-not-yet-written data + byte[] buffer = new byte[1]; + assert rw.read(buffer) == 1; + assert buffer[0] == 1; + rw.close(); + + // test read of not-yet-buffered data + rw = new BufferedRandomAccessFile(tmpFile.getPath(), "rw"); + assert rw.read(buffer) == 1; + assert buffer[0] == 1; + } + }
