http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 0276238..c600789 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -19,7 +19,6 @@ package org.apache.cassandra.service; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; @@ -61,6 +60,7 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@ -208,7 +208,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai Collection<Range<Token>> range, String keyspace, RepairParallelism parallelismDegree, - Set<InetAddress> endpoints, + Set<InetAddressAndPort> endpoints, boolean isIncremental, boolean pullRepair, boolean force, @@ -297,12 +297,12 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * * @return neighbors with whom we share the provided range */ - public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, - Range<Token> toRepair, Collection<String> dataCenters, - Collection<String> hosts) + public static Set<InetAddressAndPort> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, + Range<Token> toRepair, Collection<String> dataCenters, + Collection<String> hosts) { StorageService ss = StorageService.instance; - Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName); + Map<Range<Token>, List<InetAddressAndPort>> replicaSets = ss.getRangeToAddressMap(keyspaceName); Range<Token> rangeSuperSet = null; for (Range<Token> range : keyspaceLocalRanges) { @@ -322,17 +322,17 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) return Collections.emptySet(); - Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); - neighbors.remove(FBUtilities.getBroadcastAddress()); + Set<InetAddressAndPort> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); + neighbors.remove(FBUtilities.getBroadcastAddressAndPort()); if (dataCenters != null && !dataCenters.isEmpty()) { TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); - Set<InetAddress> dcEndpoints = Sets.newHashSet(); - Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints(); + Set<InetAddressAndPort> dcEndpoints = Sets.newHashSet(); + Multimap<String,InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints(); for (String dc : dataCenters) { - Collection<InetAddress> c = dcEndpointsMap.get(dc); + Collection<InetAddressAndPort> c = dcEndpointsMap.get(dc); if (c != null) dcEndpoints.addAll(c); } @@ -340,13 +340,13 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } else if (hosts != null && !hosts.isEmpty()) { - Set<InetAddress> specifiedHost = new HashSet<>(); + Set<InetAddressAndPort> specifiedHost = new HashSet<>(); for (final String host : hosts) { try { - final InetAddress endpoint = InetAddress.getByName(host.trim()); - if (endpoint.equals(FBUtilities.getBroadcastAddress()) || neighbors.contains(endpoint)) + final InetAddressAndPort endpoint = InetAddressAndPort.getByName(host.trim()); + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || neighbors.contains(endpoint)) specifiedHost.add(endpoint); } catch (UnknownHostException e) @@ -355,7 +355,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } - if (!specifiedHost.contains(FBUtilities.getBroadcastAddress())) + if (!specifiedHost.contains(FBUtilities.getBroadcastAddressAndPort())) throw new IllegalArgumentException("The current host must be part of the repair"); if (specifiedHost.size() <= 1) @@ -366,7 +366,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors)); } - specifiedHost.remove(FBUtilities.getBroadcastAddress()); + specifiedHost.remove(FBUtilities.getBroadcastAddressAndPort()); return specifiedHost; } @@ -393,7 +393,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } - public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) + public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { long repairedAt = getRepairedAt(options); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); @@ -412,10 +412,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return false; } - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { status.set(false); - failedNodes.add(from.getHostAddress()); + failedNodes.add(from.toString()); prepareLatch.countDown(); } }; @@ -424,7 +424,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai for (ColumnFamilyStore cfs : columnFamilyStores) tableIds.add(cfs.metadata.id); - for (InetAddress neighbour : endpoints) + for (InetAddressAndPort neighbour : endpoints) { if (FailureDetector.instance.isAlive(neighbour)) { @@ -471,7 +471,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai throw new RuntimeException(errorMsg); } - public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) + public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) { assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE; if (!registeredForEndpointChanges) @@ -517,7 +517,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return parentRepairSessions.remove(parentSessionId); } - public void handleMessage(InetAddress endpoint, RepairMessage message) + public void handleMessage(InetAddressAndPort endpoint, RepairMessage message) { RepairJobDesc desc = message.desc; RepairSession session = sessions.get(desc.sessionId); @@ -551,10 +551,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public final boolean isIncremental; public final boolean isGlobal; public final long repairedAt; - public final InetAddress coordinator; + public final InetAddressAndPort coordinator; public final PreviewKind previewKind; - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) + public ParentRepairSession(InetAddressAndPort coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) @@ -636,18 +636,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai If the coordinator node dies we should remove the parent repair session from the other nodes. This uses the same notifications as we get in RepairSession */ - public void onJoin(InetAddress endpoint, EndpointState epState) {} - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {} + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddressAndPort endpoint, EndpointState state) {} + public void onDead(InetAddressAndPort endpoint, EndpointState state) {} - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { convict(endpoint, Double.MAX_VALUE); } - public void onRestart(InetAddress endpoint, EndpointState state) + public void onRestart(InetAddressAndPort endpoint, EndpointState state) { convict(endpoint, Double.MAX_VALUE); } @@ -661,7 +661,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * @param ep endpoint to be convicted * @param phi the value of phi with with ep was convicted */ - public void convict(InetAddress ep, double phi) + public void convict(InetAddressAndPort ep, double phi) { // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index 4e26101..e373fb6 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -18,12 +18,12 @@ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> @@ -59,7 +59,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> return wrapped.isLatencyForSnitch(); } - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { wrapped.onFailure(from, failureReason); } @@ -84,7 +84,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> return wrapped.totalEndpoints(); } - protected boolean waitingFor(InetAddress from) + protected boolean waitingFor(InetAddressAndPort from) { return wrapped.waitingFor(from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 51219e6..130f3fd 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -46,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -231,6 +232,8 @@ public class CassandraDaemon } }); + SystemKeyspaceMigrator40.migrate(); + // Populate token metadata before flushing, for token-aware sstable partitioning (#6696) StorageService.instance.populateTokenMetadata(); @@ -377,7 +380,7 @@ public class CassandraDaemon ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); - if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) + if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) Gossiper.waitToSettle(); // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used @@ -445,7 +448,7 @@ public class CassandraDaemon { try { - logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName()); + logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName() + ":" + DatabaseDescriptor.getStoragePort() + ":" + DatabaseDescriptor.getSSLStoragePort()); } catch (UnknownHostException e1) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index e41cc4f..97b6172 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -60,7 +60,7 @@ public class ClientState { // We want these system cfs to be always readable to authenticated users since many tools rely on them // (nodetool, cqlsh, bulkloader, etc.) - for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.PEERS)) + for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2)) READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf)); SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 54f4b0c..82db754 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.TimeoutException; @@ -40,6 +39,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.ExcludingBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; @@ -87,7 +87,7 @@ public class DataResolver extends ResponseResolver // at the beginning of this method), so grab the response count once and use that through the method. int count = responses.size(); List<UnfilteredPartitionIterator> iters = new ArrayList<>(count); - InetAddress[] sources = new InetAddress[count]; + InetAddressAndPort[] sources = new InetAddressAndPort[count]; for (int i = 0; i < count; i++) { MessageIn<ReadResponse> msg = responses.get(i); @@ -120,7 +120,7 @@ public class DataResolver extends ResponseResolver } private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, - InetAddress[] sources, + InetAddressAndPort[] sources, DataLimits.Counter mergedResultCounter) { // If we have only one results, there is no read repair to do and we can't get short reads @@ -140,9 +140,9 @@ public class DataResolver extends ResponseResolver private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener { - private final InetAddress[] sources; + private final InetAddressAndPort[] sources; - private RepairMergeListener(InetAddress[] sources) + private RepairMergeListener(InetAddressAndPort[] sources) { this.sources = sources; } @@ -471,7 +471,7 @@ public class DataResolver extends ResponseResolver sendRepairMutation(repairs[i].build(), sources[i]); } - private void sendRepairMutation(PartitionUpdate partition, InetAddress destination) + private void sendRepairMutation(PartitionUpdate partition, InetAddressAndPort destination) { Mutation mutation = new Mutation(partition); int messagingVersion = MessagingService.instance().getVersion(destination); @@ -514,7 +514,7 @@ public class DataResolver extends ResponseResolver } private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator partitions, - InetAddress source, + InetAddressAndPort source, DataLimits.Counter mergedResultCounter) { DataLimits.Counter singleResultCounter = @@ -557,7 +557,7 @@ public class DataResolver extends ResponseResolver */ private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> { - private final InetAddress source; + private final InetAddressAndPort source; private final DataLimits.Counter singleResultCounter; // unmerged per-source counter private final DataLimits.Counter mergedResultCounter; // merged end-result counter @@ -568,7 +568,7 @@ public class DataResolver extends ResponseResolver private final long queryStartNanoTime; - private ShortReadPartitionsProtection(InetAddress source, + private ShortReadPartitionsProtection(InetAddressAndPort source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter, long queryStartNanoTime) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 4137e3a..dbd3667 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -26,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; @@ -41,8 +41,8 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); private final AtomicInteger acks = new AtomicInteger(0); - public DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, + public DatacenterSyncWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, @@ -63,7 +63,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) - for (InetAddress pending : pendingEndpoints) + for (InetAddressAndPort pending : pendingEndpoints) { responses.get(snitch.getDatacenter(pending)).incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 83dddcf..a8d7b28 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -17,10 +17,10 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collection; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -30,8 +30,8 @@ import org.apache.cassandra.db.WriteType; */ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> { - public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, + public DatacenterWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistencyLevel, Keyspace keyspace, Runnable callback, @@ -66,7 +66,7 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> } @Override - protected boolean waitingFor(InetAddress from) + protected boolean waitingFor(InetAddressAndPort from) { return consistencyLevel.isLocal(from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java index 24cb3d7..bc49d3b 100644 --- a/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java +++ b/src/java/org/apache/cassandra/service/IEndpointLifecycleSubscriber.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Interface on which interested parties can be notified of high level endpoint @@ -35,33 +35,33 @@ public interface IEndpointLifecycleSubscriber * * @param endpoint the newly added endpoint. */ - public void onJoinCluster(InetAddress endpoint); + public void onJoinCluster(InetAddressAndPort endpoint); /** * Called when a new node leave the cluster (decommission or removeToken). * * @param endpoint the endpoint that is leaving. */ - public void onLeaveCluster(InetAddress endpoint); + public void onLeaveCluster(InetAddressAndPort endpoint); /** * Called when a node is marked UP. * * @param endpoint the endpoint marked UP. */ - public void onUp(InetAddress endpoint); + public void onUp(InetAddressAndPort endpoint); /** * Called when a node is marked DOWN. * * @param endpoint the endpoint marked DOWN. */ - public void onDown(InetAddress endpoint); + public void onDown(InetAddressAndPort endpoint); /** * Called when a node has moved (to a new token). * * @param endpoint the endpoint that has moved. */ - public void onMove(InetAddress endpoint); + public void onMove(InetAddressAndPort endpoint); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/LoadBroadcaster.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java index 945dd2f..35c0b62 100644 --- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java +++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.StorageMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,21 +38,21 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber private static final Logger logger = LoggerFactory.getLogger(LoadBroadcaster.class); - private ConcurrentMap<InetAddress, Double> loadInfo = new ConcurrentHashMap<InetAddress, java.lang.Double>(); + private ConcurrentMap<InetAddressAndPort, Double> loadInfo = new ConcurrentHashMap<>(); private LoadBroadcaster() { Gossiper.instance.register(this); } - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) { if (state != ApplicationState.LOAD) return; loadInfo.put(endpoint, Double.valueOf(value.value)); } - public void onJoin(InetAddress endpoint, EndpointState epState) + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { VersionedValue localValue = epState.getApplicationState(ApplicationState.LOAD); if (localValue != null) @@ -61,20 +61,20 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber } } - public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onAlive(InetAddressAndPort endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddressAndPort endpoint, EndpointState state) {} - public void onRestart(InetAddress endpoint, EndpointState state) {} + public void onRestart(InetAddressAndPort endpoint, EndpointState state) {} - public void onRemove(InetAddress endpoint) + public void onRemove(InetAddressAndPort endpoint) { loadInfo.remove(endpoint); } - public Map<InetAddress, Double> getLoadInfo() + public Map<InetAddressAndPort, Double> getLoadInfo() { return Collections.unmodifiableMap(loadInfo); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 7ee6386..e7f30b4 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collections; import java.util.List; import java.util.Map; @@ -38,6 +37,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; @@ -56,7 +56,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> final SimpleCondition condition = new SimpleCondition(); private final long queryStartNanoTime; final int blockfor; - final List<InetAddress> endpoints; + final List<InetAddressAndPort> endpoints; private final ReadCommand command; private final ConsistencyLevel consistencyLevel; private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater @@ -65,14 +65,14 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures"); private volatile int failures = 0; - private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; + private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint; private final Keyspace keyspace; // TODO push this into ConsistencyLevel? /** * Constructor when response count has to be calculated and blocked for. */ - public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, long queryStartNanoTime) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime) { this(resolver, consistencyLevel, @@ -83,7 +83,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> queryStartNanoTime); } - public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints, long queryStartNanoTime) + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime) { this.command = command; this.keyspace = keyspace; @@ -176,7 +176,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> /** * @return true if the message counts towards the blockfor threshold */ - private boolean waitingFor(InetAddress from) + private boolean waitingFor(InetAddressAndPort from) { return consistencyLevel.isDatacenterLocal() ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)) @@ -193,9 +193,9 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> public void response(ReadResponse result) { - MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(), + MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(), result, - Collections.<String, byte[]>emptyMap(), + Collections.emptyMap(), MessagingService.Verb.INTERNAL_RESPONSE, MessagingService.current_version); response(message); @@ -245,14 +245,14 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime); AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler); } } } @Override - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { int n = waitingFor(from) ? failuresUpdater.incrementAndGet(this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StartupChecks.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 55099fc..224fd5e 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -437,7 +437,7 @@ public class StartupChecks String storedDc = SystemKeyspace.getDatacenter(); if (storedDc != null) { - String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + String currentDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort()); if (!storedDc.equals(currentDc)) { String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " + @@ -459,7 +459,7 @@ public class StartupChecks String storedRack = SystemKeyspace.getRack(); if (storedRack != null) { - String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress()); + String currentRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddressAndPort()); if (!storedRack.equals(currentRack)) { String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
