Author: brandonwilliams Date: Sat Feb 5 20:19:31 2011 New Revision: 1067508
URL: http://svn.apache.org/viewvc?rev=1067508&view=rev Log: Merge from 0.7. I hope. Modified: cassandra/trunk/ (props changed) cassandra/trunk/CHANGES.txt cassandra/trunk/conf/cassandra-env.sh cassandra/trunk/debian/changelog cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed) cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed) cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java Propchange: cassandra/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7:1026516-1066873 +/cassandra/branches/cassandra-0.7:1026516-1067497 /cassandra/branches/cassandra-0.7.0:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3:774578-796573 Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Sat Feb 5 20:19:31 2011 @@ -13,7 +13,6 @@ the nagle/delayed ack problem (CASSANDRA-1896) * check log4j configuration for changes every 10s (CASSANDRA-1525, 1907) * more-efficient cross-DC replication (CASSANDRA-1530, -2051) - * upgrade to TFastFramedTransport (CASSANDRA-1743) * avoid polluting page cache with commitlog or sstable writes and seq scan operations (CASSANDRA-1470) * add RMI authentication options to nodetool (CASSANDRA-1921) @@ -62,6 +61,7 @@ * ignore messages from newer versions, keep track of nodes in gossip regardless of version (CASSANDRA-1970) + 0.7.0-final * fix offsets to ByteBuffer.get (CASSANDRA-1939) Modified: cassandra/trunk/conf/cassandra-env.sh URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/conf/cassandra-env.sh (original) +++ cassandra/trunk/conf/cassandra-env.sh Sat Feb 5 20:19:31 2011 @@ -132,6 +132,9 @@ JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatin # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc.log" +# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 +# JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414" + # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: # comment out this entry to enable IPv6 support). Modified: cassandra/trunk/debian/changelog URL: http://svn.apache.org/viewvc/cassandra/trunk/debian/changelog?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/debian/changelog (original) +++ cassandra/trunk/debian/changelog Sat Feb 5 20:19:31 2011 @@ -2,7 +2,7 @@ cassandra (0.7.1) unstable; urgency=low * New stable point release. - -- Eric Evans <[email protected]> Fri, 28 Jan 2011 13:56:19 -0600 + -- Eric Evans <[email protected]> Fri, 04 Feb 2011 12:57:52 -0600 cassandra (0.7.0~rc4) unstable; urgency=low Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1066873 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1067497 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1066873 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1067497 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1066873 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1067497 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1066873 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1067497 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573 Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Feb 5 20:19:31 2011 @@ -1,5 +1,5 @@ /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1064713,1066843 -/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1066873 +/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1067497 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198 Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Sat Feb 5 20:19:31 2011 @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.util.*; import com.google.common.base.Charsets; @@ -963,7 +964,7 @@ public class CliClient extends CliUserHe } private void executeList(Tree statement) - throws TException, InvalidRequestException, NotFoundException, IllegalAccessException, InstantiationException, NoSuchFieldException, UnavailableException, TimedOutException + throws TException, InvalidRequestException, NotFoundException, IllegalAccessException, InstantiationException, NoSuchFieldException, UnavailableException, TimedOutException, CharacterCodingException { if (!CliMain.isConnected() || !hasKeySpace()) return; @@ -1923,7 +1924,7 @@ public class CliClient extends CliUserHe * @throws NoSuchFieldException - column not found */ private void printSliceList(CfDef columnFamilyDef, List<KeySlice> slices) - throws NotFoundException, TException, IllegalAccessException, InstantiationException, NoSuchFieldException + throws NotFoundException, TException, IllegalAccessException, InstantiationException, NoSuchFieldException, CharacterCodingException { AbstractType validator; String columnFamilyName = columnFamilyDef.getName(); Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java Sat Feb 5 20:19:31 2011 @@ -57,9 +57,9 @@ public class CliUserHelp { put(ColumnFamilyArgument.COMMENT, "Human-readable column family description. Any string is acceptable"); put(ColumnFamilyArgument.COMPARATOR, "The class used as a comparator when sorting column names.\n Valid options include: AsciiType, BytesType, LexicalUUIDType,\n LongType, TimeUUIDType, and UTF8Type"); put(ColumnFamilyArgument.SUBCOMPARATOR, "Comparator for sorting subcolumn names, for Super columns only"); - put(ColumnFamilyArgument.MEMTABLE_OPERATIONS, "Flush memtables after this many operations"); - put(ColumnFamilyArgument.MEMTABLE_THROUGHPUT, "... or after this many bytes have been written"); - put(ColumnFamilyArgument.MEMTABLE_FLUSH_AFTER, "... or after this many seconds"); + put(ColumnFamilyArgument.MEMTABLE_OPERATIONS, "Flush memtables after this many operations (in millions)"); + put(ColumnFamilyArgument.MEMTABLE_THROUGHPUT, "... or after this many MB have been written"); + put(ColumnFamilyArgument.MEMTABLE_FLUSH_AFTER, "... or after this many minutes"); put(ColumnFamilyArgument.ROWS_CACHED, "Number or percentage of rows to cache"); put(ColumnFamilyArgument.ROW_CACHE_SAVE_PERIOD, "Period with which to persist the row cache, in seconds"); put(ColumnFamilyArgument.KEYS_CACHED, "Number or percentage of keys to cache"); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Sat Feb 5 20:19:31 2011 @@ -72,6 +72,8 @@ public class DefinitionsUpdateResponseVe try { m.apply(); + // update gossip, but don't contact nodes directly + m.passiveAnnounce(); } catch (ConfigurationException ex) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Sat Feb 5 20:19:31 2011 @@ -23,11 +23,14 @@ import java.lang.management.ManagementFa import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import static com.google.common.base.Charsets.UTF_8; + +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +45,7 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; @@ -226,17 +230,53 @@ public class HintedHandOffManager implem int index = ByteBufferUtil.lastIndexOf(joined, SEPARATOR.getBytes()[0], joined.limit()); if (index == -1 || index < (joined.position() + 1)) - throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.string(joined)); + throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.bytesToHex(joined)); + + try + { + return new String[] { ByteBufferUtil.string(joined, joined.position(), index - joined.position()), + ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) }; + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } + } - return new String[] { - ByteBufferUtil.string(joined, joined.position(), index - joined.position()), - ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) - }; + private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException + { + Gossiper gossiper = Gossiper.instance; + int waited = 0; + while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals( + gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value)) + { + Thread.sleep(1000); + waited += 1000; + if (waited > 2 * StorageService.RING_DELAY) + throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); + } + logger_.debug("schema for {} matches local schema", endpoint); + return waited; } - private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException + private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException { + logger_.info("Checking remote schema before delivering hints"); + int waited = waitForSchemaAgreement(endpoint); + // sleep a random amount to stagger handoff delivery from different replicas. + // (if we had to wait, then gossiper randomness took care of that for us already.) + if (waited == 0) { + int sleep = new Random().nextInt(60000); + logger_.info("Sleeping {}ms to stagger hint delivery", sleep); + Thread.sleep(sleep); + } + if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive()) + { + logger_.info("Endpoint {} died before hint delivery, aborting", endpoint); + return; + } logger_.info("Started hinted handoff for endpoint " + endpoint); + queuedDeliveries.remove(endpoint); // 1. Get the key of the endpoint we need to handoff Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Sat Feb 5 20:19:31 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import com.google.common.base.Charsets; @@ -36,7 +37,14 @@ public class AsciiType extends BytesType @Override public String getString(ByteBuffer bytes) { - return ByteBufferUtil.string(bytes, Charsets.US_ASCII); + try + { + return ByteBufferUtil.string(bytes, Charsets.US_ASCII); + } + catch (CharacterCodingException e) + { + throw new MarshalException("Invalid ascii bytes " + ByteBufferUtil.bytesToHex(bytes)); + } } public ByteBuffer fromString(String source) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java Sat Feb 5 20:19:31 2011 @@ -22,12 +22,10 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; -import java.util.Arrays; import com.google.common.base.Charsets; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; public class UTF8Type extends BytesType { @@ -39,11 +37,11 @@ public class UTF8Type extends BytesType { try { - return FBUtilities.decodeToUTF8(bytes); + return ByteBufferUtil.string(bytes, Charsets.UTF_8); } catch (CharacterCodingException e) { - throw new MarshalException("invalid UTF8 bytes " + ByteBufferUtil.string(bytes)); + throw new MarshalException("invalid UTF8 bytes " + ByteBufferUtil.bytesToHex(bytes)); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Sat Feb 5 20:19:31 2011 @@ -175,10 +175,15 @@ public abstract class Migration if (StorageService.instance.isClientMode()) return; - // immediate notification for esiting nodes. + // immediate notification for existing nodes. MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers()); } - + + public final void passiveAnnounce() + { + MigrationManager.passiveAnnounce(newVersion); + } + public static UUID getLastMigrationId() { DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY); Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/dht/CollatingOrderPreservingPartitioner.java Sat Feb 5 20:19:31 2011 @@ -25,7 +25,9 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import org.apache.cassandra.utils.FBUtilities; +import com.google.common.base.Charsets; + +import org.apache.cassandra.utils.ByteBufferUtil; public class CollatingOrderPreservingPartitioner extends AbstractByteOrderedPartitioner { @@ -39,7 +41,7 @@ public class CollatingOrderPreservingPar String skey; try { - skey = FBUtilities.decodeToUTF8(key); + skey = ByteBufferUtil.string(key, Charsets.UTF_8); } catch (CharacterCodingException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java Sat Feb 5 20:19:31 2011 @@ -123,7 +123,14 @@ public class OrderPreservingPartitioner public Token<String> fromByteArray(ByteBuffer bytes) { - return new StringToken(ByteBufferUtil.string(bytes, Charsets.UTF_8)); + try + { + return new StringToken(ByteBufferUtil.string(bytes, Charsets.UTF_8)); + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } } public String toString(Token<String> stringToken) @@ -152,7 +159,7 @@ public class OrderPreservingPartitioner String skey; try { - skey = FBUtilities.decodeToUTF8(key); + skey = ByteBufferUtil.string(key, Charsets.UTF_8); } catch (CharacterCodingException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java Sat Feb 5 20:19:31 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra.dht; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.util.*; import org.apache.cassandra.db.DecoratedKey; @@ -61,7 +62,15 @@ public class RandomPartitioner implement assert splitPoint != -1; // and decode the token and key - String token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position(), UTF_8); + String token = null; + try + { + token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position(), UTF_8); + } + catch (CharacterCodingException e) + { + throw new RuntimeException(e); + } ByteBuffer key = fromdisk.duplicate(); key.position(splitPoint + 1); return new DecoratedKey<BigIntegerToken>(new BigIntegerToken(token), key); Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Sat Feb 5 20:19:31 2011 @@ -257,7 +257,7 @@ public class Gossiper implements IFailur liveEndpoints.remove(endpoint); unreachableEndpoints.remove(endpoint); - endpointStateMap.remove(endpoint); + // do not remove endpointState until the quarantine expires FailureDetector.instance.remove(endpoint); versions.remove(endpoint); justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); @@ -325,9 +325,7 @@ public class Gossiper implements IFailur ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos); - if (logger.isTraceEnabled()) - logger.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length); - return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray()); + return new Message(localEndpoint_, StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray()); } Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException @@ -433,7 +431,8 @@ public class Gossiper implements IFailur else { logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); - removeEndpoint(endpoint); + if (!justRemovedEndpoints_.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client + removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state } } @@ -453,6 +452,7 @@ public class Gossiper implements IFailur if (logger.isDebugEnabled()) logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); justRemovedEndpoints.remove(entry.getKey()); + endpointStateMap_.remove(entry.getKey()); } } } @@ -465,8 +465,6 @@ public class Gossiper implements IFailur EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) { - if (logger.isTraceEnabled()) - logger.trace("Scanning for state greater than " + version + " for " + forEndpoint); EndpointState epState = endpointStateMap.get(forEndpoint); EndpointState reqdEndpointState = null; @@ -484,6 +482,8 @@ public class Gossiper implements IFailur if ( localHbVersion > version ) { reqdEndpointState = new EndpointState(epState.getHeartBeatState()); + if (logger_.isTraceEnabled()) + logger_.trace("local heartbeat version " + localHbVersion + " greater than " + version + " for " + forEndpoint); } /* Accumulate all application states whose versions are greater than "version" variable */ for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet()) @@ -656,6 +656,11 @@ public class Gossiper implements IFailur else if (logger.isTraceEnabled()) logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep); } + else + { + if (logger_.isTraceEnabled()) + logger_.trace("Ignoring remote generation " + remoteGeneration + " < " + localGeneration); + } } else { @@ -671,9 +676,9 @@ public class Gossiper implements IFailur int oldVersion = localState.getHeartBeatState().getHeartBeatVersion(); Map<ApplicationState, VersionedValue> localAppStateMap = localState.getApplicationStateMap(); - localState.setHeartBeatState(remoteState.getHeartBeatState()); - if (logger.isTraceEnabled()) - logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ..."); + localState.setHeartBeatState(remoteHbState); + if (logger_.isTraceEnabled()) + logger_.trace("Updating heartbeat state generation to " + remoteHbState.getGeneration() + " from " + localHbState.getGeneration() + " for " + addr); for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet()) { @@ -700,6 +705,8 @@ public class Gossiper implements IFailur { /* We are here since we have no data for this endpoint locally so request everthing. */ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0) ); + if (logger_.isTraceEnabled()) + logger_.trace("requestAll for " + gDigest.getEndpoint()); } /* Send all the data with version greater than maxRemoteVersion */ Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Sat Feb 5 20:19:31 2011 @@ -89,15 +89,21 @@ public class MigrationManager implements } } - /** announce my version to a set of hosts. They may culminate with them sending me migrations. */ + /** actively announce my version to a set of hosts via rpc. They may culminate with them sending me migrations. */ public static void announce(UUID version, Set<InetAddress> hosts) { Message msg = makeVersionMessage(version); for (InetAddress host : hosts) MessagingService.instance().sendOneWay(msg, host); - // this is for notifying nodes as they arrive in the cluster. + passiveAnnounce(version); + } + + /** announce my version passively over gossip **/ + public static void passiveAnnounce(UUID version) + { if (!StorageService.instance.isClientMode()) Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version)); + logger.debug("Announcing my schema is " + version); } /** @@ -152,6 +158,7 @@ public class MigrationManager implements throw new IOException(e); } } + passiveAnnounce(to); // we don't need to send rpcs, but we need to update gossip } /** pushes migrations from this host to another host */ Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sat Feb 5 20:19:31 2011 @@ -1602,9 +1602,10 @@ public class StorageService implements I calculatePendingRanges(); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken())); + logger_.info("Announcing that I have left the ring for " + RING_DELAY + "ms"); try { - Thread.sleep(2 * Gossiper.intervalInMillis); + Thread.sleep(RING_DELAY); } catch (InterruptedException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Sat Feb 5 20:19:31 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.thrift; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; @@ -27,11 +26,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.thrift.TBinaryProtocol; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; -import org.apache.thrift.transport.TFastFramedTransport; +import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; @@ -123,8 +121,8 @@ public class CassandraDaemon extends org if (DatabaseDescriptor.isThriftFramed()) { int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize(); - inTransportFactory = new TFastFramedTransport.Factory(64 * 1024, tFramedTransportSize); - outTransportFactory = new TFastFramedTransport.Factory(64 * 1024, tFramedTransportSize); + inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); + outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize); } else Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Sat Feb 5 20:19:31 2011 @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.util.Arrays; @@ -100,28 +101,27 @@ public class ByteBufferUtil return compareUnsigned(o1, ByteBuffer.wrap(o2)); } - public static String string(ByteBuffer buffer) + public static String string(ByteBuffer buffer) throws CharacterCodingException { return string(buffer, Charset.defaultCharset()); } - public static String string(ByteBuffer buffer, Charset charset) + public static String string(ByteBuffer buffer, int offset, int length) throws CharacterCodingException { - return string(buffer, buffer.position(), buffer.remaining(), charset); + return string(buffer, offset, length, Charset.defaultCharset()); } - public static String string(ByteBuffer buffer, int offset, int length) + public static String string(ByteBuffer buffer, int offset, int length, Charset charset) throws CharacterCodingException { - return string(buffer, offset, length, Charset.defaultCharset()); + ByteBuffer copy = buffer.duplicate(); + copy.position(buffer.position() + offset); + copy.limit(copy.position() + length); + return string(buffer, charset); } - public static String string(ByteBuffer buffer, int offset, int length, Charset charset) + public static String string(ByteBuffer buffer, Charset charset) throws CharacterCodingException { - if (buffer.hasArray()) - return new String(buffer.array(), buffer.arrayOffset() + offset, length, charset); - - byte[] buff = getArray(buffer, offset, length); - return new String(buff, charset); + return charset.newDecoder().decode(buffer.duplicate()).toString(); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Feb 5 20:19:31 2011 @@ -452,11 +452,6 @@ public class FBUtilities return utflen; } - public static String decodeToUTF8(ByteBuffer bytes) throws CharacterCodingException - { - return Charsets.UTF_8.newDecoder().decode(bytes.duplicate()).toString(); - } - public static String resourceToFile(String filename) throws ConfigurationException { ClassLoader loader = PropertyFileSnitch.class.getClassLoader(); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Sat Feb 5 20:19:31 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.*; @@ -511,7 +512,14 @@ public class TableTest extends CleanupHe List<String> L = new ArrayList<String>(); for (IColumn column : columns) { - L.add(ByteBufferUtil.string(column.name())); + try + { + L.add(ByteBufferUtil.string(column.name())); + } + catch (CharacterCodingException e) + { + throw new AssertionError(e); + } } List<String> names = new ArrayList<String>(columnNames.length); Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java?rev=1067508&r1=1067507&r2=1067508&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java Sat Feb 5 20:19:31 2011 @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Arrays; +import com.google.common.base.Charsets; import org.junit.Test; public class FBUtilitiesTest @@ -126,6 +127,6 @@ public class FBUtilitiesTest public void testDecode() throws IOException { ByteBuffer bytes = ByteBuffer.wrap(new byte[]{(byte)0xff, (byte)0xfe}); - FBUtilities.decodeToUTF8(bytes); + ByteBufferUtil.string(bytes, Charsets.UTF_8); } }
