Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Sep 13 21:08:48 2011 @@ -165,7 +165,12 @@ public class BootStrapper } if (endpoints.isEmpty()) - throw new RuntimeException("No other nodes seen! Unable to bootstrap"); + throw new RuntimeException("No other nodes seen! Unable to bootstrap." + + "If you intended to start a single-node cluster, you should make sure " + + "your broadcast_address (or listen_address) is listed as a seed. " + + "Otherwise, you need to determine why the seed being contacted " + + "has no knowledge of the rest of the cluster. Usually, this can be solved " + + "by giving all nodes the same seed list."); Collections.sort(endpoints, new Comparator<InetAddress>() { public int compare(InetAddress ia1, InetAddress ia2)
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Sep 13 21:08:48 2011 @@ -143,6 +143,8 @@ public class ColumnFamilyRecordReader ex return true; } + // we don't use endpointsnitch since we are trying to support hadoop nodes that are + // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least. private String getLocation() { InetAddress[] localAddresses; @@ -173,7 +175,7 @@ public class ColumnFamilyRecordReader ex } } } - throw new UnsupportedOperationException("no local connection available"); + return split.getLocations()[0]; } private class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>> Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Tue Sep 13 21:08:48 2011 @@ -25,6 +25,8 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import org.apache.cassandra.utils.ByteBufferUtil; + public class MappedFileDataInput extends AbstractDataInput implements FileDataInput { private final MappedByteBuffer buffer; @@ -117,11 +119,18 @@ public class MappedFileDataInput extends throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested", remaining, length)); + if (length == 0) + return ByteBufferUtil.EMPTY_BYTE_BUFFER; + ByteBuffer bytes = buffer.duplicate(); bytes.position(buffer.position() + position).limit(buffer.position() + position + length); position += length; - return bytes; + // we have to copy the data in case we unreference the underlying sstable. See CASSANDRA-3179 + ByteBuffer clone = ByteBuffer.allocate(bytes.remaining()); + clone.put(bytes); + clone.flip(); + return clone; } @Override Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Sep 13 21:08:48 2011 @@ -67,6 +67,8 @@ public class IncomingTcpConnection exten int header = input.readInt(); isStream = MessagingService.getBits(header, 3, 1) == 1; version = MessagingService.getBits(header, 15, 8); + if (logger.isDebugEnabled()) + logger.debug("Version for " + socket.getInetAddress() + " is " + version); if (isStream) { if (version == MessagingService.version_) @@ -98,6 +100,7 @@ public class IncomingTcpConnection exten else if (msg != null) { Gossiper.instance.setVersion(msg.getFrom(), version); + logger.debug("set version for {} to {}", socket.getInetAddress(), version); } // loop to get the next message. @@ -108,6 +111,7 @@ public class IncomingTcpConnection exten header = input.readInt(); assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream; version = MessagingService.getBits(header, 15, 8); + logger.debug("Version is now {}", version); receiveMessage(input, version); } } @@ -153,7 +157,7 @@ public class IncomingTcpConnection exten MessagingService.instance().receive(message, id); return message; } - logger.info("Received connection from newer protocol version. Ignorning message."); + logger.debug("Received connection from newer protocol version {}. Ignorning message", version); return null; } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Sep 13 21:08:48 2011 @@ -346,15 +346,15 @@ public abstract class AbstractCassandraD } start(); - } catch (Throwable e) + } + catch (Throwable e) { - String msg = "Exception encountered during startup."; - logger.error(msg, e); + logger.error("Exception encountered during startup", e); // try to warn user on stdout too, if we haven't already detached - System.out.println(msg); e.printStackTrace(); - + System.out.println("Exception encountered during startup: " + e.getMessage()); + System.exit(3); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Sep 13 21:08:48 2011 @@ -107,7 +107,7 @@ public class GCInspector if (previousCount == null) previousCount = 0L; - if (count == previousCount) + if (count.equals(previousCount)) continue; gccounts.put(gc.getName(), count); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Tue Sep 13 21:08:48 2011 @@ -79,7 +79,7 @@ public class RowRepairResolver extends A endpoints.add(message.getFrom()); // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643 - int liveColumns = cf.getLiveColumnCount(); + int liveColumns = cf == null ? 0 : cf.getLiveColumnCount(); if (liveColumns > maxLiveColumns) maxLiveColumns = liveColumns; } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Sep 13 21:08:48 2011 @@ -31,9 +31,7 @@ import javax.lang.model.type.TypeKind; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.*; import org.apache.cassandra.config.*; import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; @@ -44,6 +42,7 @@ import org.apache.cassandra.concurrent.D import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Table; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.*; import org.apache.cassandra.gms.*; @@ -641,11 +640,6 @@ public class StorageService implements I */ public Map<Range, List<String>> getRangeToEndpointMap(String keyspace) { - // some people just want to get a visual representation of things. Allow null and set it to the first - // non-system table. - if (keyspace == null) - keyspace = Schema.instance.getNonSystemTables().get(0); - /* All the ranges for the tokens */ Map<Range, List<String>> map = new HashMap<Range, List<String>>(); for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) @@ -656,17 +650,27 @@ public class StorageService implements I } /** + * Return the rpc address associated with an endpoint as a string. + * @param endpoint The endpoint to get rpc address for + * @return + */ + public String getRpcaddress(InetAddress endpoint) + { + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + return DatabaseDescriptor.getRpcAddress().getHostAddress(); + else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null) + return endpoint.getHostAddress(); + else + return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value; + } + + /** * for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace. * @param keyspace * @return */ public Map<Range, List<String>> getRangeToRpcaddressMap(String keyspace) { - // some people just want to get a visual representation of things. Allow null and set it to the first - // non-system table. - if (keyspace == null) - keyspace = Schema.instance.getNonSystemTables().get(0); - /* All the ranges for the tokens */ Map<Range, List<String>> map = new HashMap<Range, List<String>>(); for (Map.Entry<Range,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet()) @@ -674,12 +678,7 @@ public class StorageService implements I List<String> rpcaddrs = new ArrayList<String>(); for (InetAddress endpoint: entry.getValue()) { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - rpcaddrs.add(DatabaseDescriptor.getRpcAddress().getHostAddress()); - else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null) - rpcaddrs.add(endpoint.getHostAddress()); - else - rpcaddrs.add(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value); + rpcaddrs.add(getRpcaddress(endpoint)); } map.put(entry.getKey(), rpcaddrs); } @@ -704,6 +703,11 @@ public class StorageService implements I public Map<Range, List<InetAddress>> getRangeToAddressMap(String keyspace) { + // some people just want to get a visual representation of things. Allow null and set it to the first + // non-system table. + if (keyspace == null) + keyspace = Schema.instance.getNonSystemTables().get(0); + List<Range> ranges = getAllRanges(tokenMetadata_.sortedTokens()); return constructRangeToEndpointMap(keyspace, ranges); } @@ -1096,10 +1100,9 @@ public class StorageService implements I // all leaving nodes are gone. for (Range range : affectedRanges) { - Collection<InetAddress> currentEndpoints = strategy.calculateNaturalEndpoints(range.right, tm); - Collection<InetAddress> newEndpoints = strategy.calculateNaturalEndpoints(range.right, allLeftMetadata); - newEndpoints.removeAll(currentEndpoints); - pendingRanges.putAll(range, newEndpoints); + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata)); + pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints)); } // At this stage pendingRanges has been updated according to leave operations. We can @@ -2086,8 +2089,9 @@ public class StorageService implements I for (Range toStream : rangesPerTable.left) { - List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone); - rangeWithEndpoints.putAll(toStream, endpoints); + Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetadata_)); + Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone)); + rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints)); } // associating table with range-to-endpoints map Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1170350&r1=1170349&r2=1170350&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Sep 13 21:08:48 2011 @@ -798,6 +798,7 @@ public class CassandraServer implements { Range range = entry.getKey(); List<String> endpoints = new ArrayList<String>(); + List<String> rpc_endpoints = new ArrayList<String>(); List<EndpointDetails> epDetails = new ArrayList<EndpointDetails>(); for (InetAddress endpoint : entry.getValue()) @@ -823,13 +824,16 @@ public class CassandraServer implements details.datacenter = appStateDc.value; endpoints.add(details.host); + rpc_endpoints.add(StorageService.instance.getRpcaddress(endpoint)); if (details.port != -1 || details.datacenter != null) epDetails.add(details); } - ranges.add(new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints) - .setEndpoint_details(epDetails)); + TokenRange tr = new TokenRange(tf.toString(range.left), tf.toString(range.right), endpoints) + .setEndpoint_details(epDetails) + .setRpc_endpoints(rpc_endpoints); + ranges.add(tr); } return ranges;