http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java index ce05e93..c7d45bf 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes @@ -47,19 +47,19 @@ public class ReduceHelper /** * Reduces the differences provided by the merkle trees to a minimum set of differences */ - public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter) + public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter) { - Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences); - Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>(); - ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder(); - for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet()) + Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences); + Map<InetAddressAndPort, Integer> outgoingStreamCounts = new HashMap<>(); + ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder(); + for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet()) { IncomingRepairStreamTracker tracker = trackerEntry.getValue(); HostDifferences rangesToFetch = new HostDifferences(); for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet()) { Range<Token> rangeToFetch = entry.getKey(); - for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter)) + for (InetAddressAndPort remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter)) rangesToFetch.addSingleRange(remoteNode, rangeToFetch); } mapBuilder.put(trackerEntry.getKey(), rangesToFetch); @@ -69,14 +69,14 @@ public class ReduceHelper } @VisibleForTesting - static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences) + static Map<InetAddressAndPort, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences) { - Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>(); + Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = new HashMap<>(); - for (InetAddress hostWithDifference : differences.keyHosts()) + for (InetAddressAndPort hostWithDifference : differences.keyHosts()) { HostDifferences hostDifferences = differences.get(hostWithDifference); - for (InetAddress differingHost : hostDifferences.hosts()) + for (InetAddressAndPort differingHost : hostDifferences.hosts()) { List<Range<Token>> differingRanges = hostDifferences.get(differingHost); // hostWithDifference has mismatching ranges differingRanges with differingHost: @@ -93,24 +93,24 @@ public class ReduceHelper } private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences, - Map<InetAddress, IncomingRepairStreamTracker> trackers, - InetAddress host) + Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers, + InetAddressAndPort host) { return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences)); } // greedily pick the nodes doing the least amount of streaming - private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode, + private static Collection<InetAddressAndPort> pickLeastStreaming(InetAddressAndPort streamingNode, StreamFromOptions toStreamFrom, - Map<InetAddress, Integer> outgoingStreamCounts, + Map<InetAddressAndPort, Integer> outgoingStreamCounts, PreferedNodeFilter filter) { - Set<InetAddress> retSet = new HashSet<>(); - for (Set<InetAddress> toStream : toStreamFrom.allStreams()) + Set<InetAddressAndPort> retSet = new HashSet<>(); + for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams()) { - InetAddress candidate = null; - Set<InetAddress> prefered = filter.apply(streamingNode, toStream); - for (InetAddress node : prefered) + InetAddressAndPort candidate = null; + Set<InetAddressAndPort> prefered = filter.apply(streamingNode, toStream); + for (InetAddressAndPort node : prefered) { if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0)) { @@ -120,7 +120,7 @@ public class ReduceHelper // ok, found no prefered hosts, try all of them if (candidate == null) { - for (InetAddress node : toStream) + for (InetAddressAndPort node : toStream) { if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0)) {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java index 4516f23..6070983 100644 --- a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java +++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.asymmetric; -import java.net.InetAddress; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -28,6 +27,7 @@ import com.google.common.collect.Sets; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Keeps track of where a node needs to stream a given range from. @@ -53,18 +53,18 @@ public class StreamFromOptions /** * Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling */ - private final Set<Set<InetAddress>> streamOptions = new HashSet<>(); + private final Set<Set<InetAddressAndPort>> streamOptions = new HashSet<>(); public StreamFromOptions(DifferenceHolder differences, Range<Token> range) { this(differences, range, Collections.emptySet()); } - private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing) + private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddressAndPort>> existing) { this.differences = differences; this.range = range; - for (Set<InetAddress> addresses : existing) + for (Set<InetAddressAndPort> addresses : existing) this.streamOptions.add(Sets.newHashSet(addresses)); } @@ -75,11 +75,11 @@ public class StreamFromOptions * range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group * of nodes containing this new node. */ - public void add(InetAddress streamFromNode) + public void add(InetAddressAndPort streamFromNode) { - for (Set<InetAddress> options : streamOptions) + for (Set<InetAddressAndPort> options : streamOptions) { - InetAddress first = options.iterator().next(); + InetAddressAndPort first = options.iterator().next(); if (!differences.hasDifferenceBetween(first, streamFromNode, range)) { options.add(streamFromNode); @@ -94,7 +94,7 @@ public class StreamFromOptions return new StreamFromOptions(differences, withRange, streamOptions); } - public Iterable<Set<InetAddress>> allStreams() + public Iterable<Set<InetAddressAndPort>> allStreams() { return streamOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java index c137346..78057e2 100644 --- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -33,6 +33,7 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.FailSession; import org.apache.cassandra.repair.messages.FinalizeCommit; import org.apache.cassandra.repair.messages.FinalizePromise; @@ -55,7 +56,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin; * There are 4 stages to a consistent incremental repair. * * <h1>Repair prepare</h1> - * First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddress, Set, RepairOption, List)} stuff + * First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddressAndPort, Set, RepairOption, List)} stuff * happens, which sends out {@link PrepareMessage} and creates a {@link ActiveRepairService.ParentRepairSession} * on the coordinator and each of the neighbors. * @@ -68,7 +69,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin; * coordinator indicating success or failure. If the pending anti-compaction fails, the local session state is set * to {@code FAILED}. * <p/> - * (see {@link LocalSessions#handlePrepareMessage(InetAddress, PrepareConsistentRequest)} + * (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, PrepareConsistentRequest)} * <p/> * Once the coordinator recieves positive {@code PrepareConsistentResponse} messages from all the participants, the * coordinator begins the normal repair process. @@ -99,8 +100,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin; * & {@link CoordinatorSession#finalizeCommit()} * <p/> * - * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)} - * & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, FinalizeCommit)} + * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddressAndPort, FinalizePropose)} + * & {@link LocalSessions#handleFinalizeCommitMessage(InetAddressAndPort, FinalizeCommit)} * * <h1>Failure</h1> * If there are any failures or problems during the process above, the session will be failed. When a session is failed, @@ -187,11 +188,11 @@ public abstract class ConsistentSession private volatile State state; public final UUID sessionID; - public final InetAddress coordinator; + public final InetAddressAndPort coordinator; public final ImmutableSet<TableId> tableIds; public final long repairedAt; public final ImmutableSet<Range<Token>> ranges; - public final ImmutableSet<InetAddress> participants; + public final ImmutableSet<InetAddressAndPort> participants; ConsistentSession(AbstractBuilder builder) { @@ -260,11 +261,11 @@ public abstract class ConsistentSession { private State state; private UUID sessionID; - private InetAddress coordinator; + private InetAddressAndPort coordinator; private Set<TableId> ids; private long repairedAt; private Collection<Range<Token>> ranges; - private Set<InetAddress> participants; + private Set<InetAddressAndPort> participants; void withState(State state) { @@ -276,7 +277,7 @@ public abstract class ConsistentSession this.sessionID = sessionID; } - void withCoordinator(InetAddress coordinator) + void withCoordinator(InetAddressAndPort coordinator) { this.coordinator = coordinator; } @@ -301,7 +302,7 @@ public abstract class ConsistentSession this.ranges = ranges; } - void withParticipants(Set<InetAddress> peers) + void withParticipants(Set<InetAddressAndPort> peers) { this.participants = peers; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java index ba0025f..f52a28d 100644 --- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +38,7 @@ import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairSessionResult; @@ -58,7 +58,7 @@ public class CoordinatorSession extends ConsistentSession { private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class); - private final Map<InetAddress, State> participantStates = new HashMap<>(); + private final Map<InetAddressAndPort, State> participantStates = new HashMap<>(); private final SettableFuture<Boolean> prepareFuture = SettableFuture.create(); private final SettableFuture<Boolean> finalizeProposeFuture = SettableFuture.create(); @@ -69,7 +69,7 @@ public class CoordinatorSession extends ConsistentSession public CoordinatorSession(Builder builder) { super(builder); - for (InetAddress participant : participants) + for (InetAddressAndPort participant : participants) { participantStates.put(participant, State.PREPARING); } @@ -95,7 +95,7 @@ public class CoordinatorSession extends ConsistentSession super.setState(state); } - public synchronized void setParticipantState(InetAddress participant, State state) + public synchronized void setParticipantState(InetAddressAndPort participant, State state) { logger.trace("Setting participant {} to state {} for repair {}", participant, state, sessionID); Preconditions.checkArgument(participantStates.containsKey(participant), @@ -115,7 +115,7 @@ public class CoordinatorSession extends ConsistentSession synchronized void setAll(State state) { - for (InetAddress participant : participants) + for (InetAddressAndPort participant : participants) { setParticipantState(participant, state); } @@ -131,7 +131,7 @@ public class CoordinatorSession extends ConsistentSession return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED); } - protected void sendMessage(InetAddress destination, RepairMessage message) + protected void sendMessage(InetAddressAndPort destination, RepairMessage message) { logger.trace("Sending {} to {}", message, destination); MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer); @@ -144,14 +144,14 @@ public class CoordinatorSession extends ConsistentSession logger.debug("Beginning prepare phase of incremental repair session {}", sessionID); PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants); - for (final InetAddress participant : participants) + for (final InetAddressAndPort participant : participants) { sendMessage(participant, message); } return prepareFuture; } - public synchronized void handlePrepareResponse(InetAddress participant, boolean success) + public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success) { if (getState() == State.FAILED) { @@ -185,14 +185,14 @@ public class CoordinatorSession extends ConsistentSession Preconditions.checkArgument(allStates(State.REPAIRING)); logger.debug("Proposing finalization of repair session {}", sessionID); FinalizePropose message = new FinalizePropose(sessionID); - for (final InetAddress participant : participants) + for (final InetAddressAndPort participant : participants) { sendMessage(participant, message); } return finalizeProposeFuture; } - public synchronized void handleFinalizePromise(InetAddress participant, boolean success) + public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success) { if (getState() == State.FAILED) { @@ -221,7 +221,7 @@ public class CoordinatorSession extends ConsistentSession Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED)); logger.debug("Committing finalization of repair session {}", sessionID); FinalizeCommit message = new FinalizeCommit(sessionID); - for (final InetAddress participant : participants) + for (final InetAddressAndPort participant : participants) { sendMessage(participant, message); } @@ -233,7 +233,7 @@ public class CoordinatorSession extends ConsistentSession { logger.info("Incremental repair session {} failed", sessionID); FailSession message = new FailSession(sessionID); - for (final InetAddress participant : participants) + for (final InetAddressAndPort participant : participants) { if (participantStates.get(participant) != State.FAILED) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java index 211e0c1..bb84d0a 100644 --- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -26,6 +25,7 @@ import java.util.UUID; import com.google.common.base.Preconditions; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.FailSession; import org.apache.cassandra.repair.messages.FinalizePromise; import org.apache.cassandra.repair.messages.PrepareConsistentResponse; @@ -43,7 +43,7 @@ public class CoordinatorSessions return new CoordinatorSession(builder); } - public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddress> participants) + public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> participants) { Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already exists for session %s", sessionId); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java index 903aeb5..98b883a 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java @@ -21,10 +21,12 @@ package org.apache.cassandra.repair.consistent; import java.net.InetAddress; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -40,6 +42,7 @@ public class LocalSessionInfo public static final String LAST_UPDATE = "LAST_UPDATE"; public static final String COORDINATOR = "COORDINATOR"; public static final String PARTICIPANTS = "PARTICIPANTS"; + public static final String PARTICIPANTS_WP = "PARTICIPANTS_WP"; public static final String TABLES = "TABLES"; @@ -59,7 +62,8 @@ public class LocalSessionInfo m.put(STARTED, Integer.toString(session.getStartedAt())); m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate())); m.put(COORDINATOR, session.coordinator.toString()); - m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants, InetAddress::toString))); + m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants.stream().map(peer -> peer.address).collect(Collectors.toList()), InetAddress::getHostAddress))); + m.put(PARTICIPANTS_WP, Joiner.on(',').join(Iterables.transform(session.participants, InetAddressAndPort::toString))); m.put(TABLES, Joiner.on(',').join(Iterables.transform(session.tableIds, LocalSessionInfo::tableString))); return m; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index 4ef2c2c..e62f6fd 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair.consistent; import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.time.Instant; import java.util.Date; @@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; @@ -49,13 +51,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -141,13 +148,13 @@ public class LocalSessions } @VisibleForTesting - protected InetAddress getBroadcastAddress() + protected InetAddressAndPort getBroadcastAddressAndPort() { - return FBUtilities.getBroadcastAddress(); + return FBUtilities.getBroadcastAddressAndPort(); } @VisibleForTesting - protected boolean isAlive(InetAddress address) + protected boolean isAlive(InetAddressAndPort address) { return FailureDetector.instance.isAlive(address); } @@ -177,14 +184,14 @@ public class LocalSessions logger.info("Cancelling local repair session {}", sessionID); LocalSession session = getSession(sessionID); Preconditions.checkArgument(session != null, "Session {} does not exist", sessionID); - Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddress()), + Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddressAndPort()), "Cancel session %s from it's coordinator (%s) or use --force", sessionID, session.coordinator); setStateAndSave(session, FAILED); - for (InetAddress participant : session.participants) + for (InetAddressAndPort participant : session.participants) { - if (!participant.equals(getBroadcastAddress())) + if (!participant.equals(getBroadcastAddressAndPort())) sendMessage(participant, new FailSession(sessionID)); } } @@ -335,10 +342,12 @@ public class LocalSessions "repaired_at, " + "state, " + "coordinator, " + + "coordinator_port, " + "participants, " + + "participants_wp," + "ranges, " + "cfids) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; QueryProcessor.executeInternal(String.format(query, keyspace, table), session.sessionID, @@ -346,8 +355,10 @@ public class LocalSessions Date.from(Instant.ofEpochSecond(session.getLastUpdate())), Date.from(Instant.ofEpochMilli(session.repairedAt)), session.getState().ordinal(), - session.coordinator, - session.participants, + session.coordinator.address, + session.coordinator.port, + session.participants.stream().map(participant -> participant.address).collect(Collectors.toSet()), + session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()), serializeRanges(session.ranges), tableIdToUuid(session.tableIds)); } @@ -362,12 +373,27 @@ public class LocalSessions LocalSession.Builder builder = LocalSession.builder(); builder.withState(ConsistentSession.State.valueOf(row.getInt("state"))); builder.withSessionID(row.getUUID("parent_id")); - builder.withCoordinator(row.getInetAddress("coordinator")); + InetAddressAndPort coordinator = InetAddressAndPort.getByAddressOverrideDefaults( + row.getInetAddress("coordinator"), + row.getInt("coordinator_port")); + builder.withCoordinator(coordinator); builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance))); builder.withRepairedAt(row.getTimestamp("repaired_at").getTime()); builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance))); - builder.withParticipants(row.getSet("participants", InetAddressType.instance)); - + //There is no cross version streaming and thus no cross version repair so assume that + //any valid repair sessions has the participants_wp column and any that doesn't is malformed + Set<String> participants = row.getSet("participants_wp", UTF8Type.instance); + builder.withParticipants(participants.stream().map(participant -> + { + try + { + return InetAddressAndPort.getByName(participant); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + }).collect(Collectors.toSet())); builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at"))); builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update"))); @@ -440,7 +466,7 @@ public class LocalSessions } @VisibleForTesting - LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers) + LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers) { LocalSession.Builder builder = LocalSession.builder(); builder.withState(ConsistentSession.State.PREPARING); @@ -464,7 +490,7 @@ public class LocalSessions return ActiveRepairService.instance.getParentRepairSession(sessionID); } - protected void sendMessage(InetAddress destination, RepairMessage message) + protected void sendMessage(InetAddressAndPort destination, RepairMessage message) { logger.trace("sending {} to {}", message, destination); MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer); @@ -536,12 +562,12 @@ public class LocalSessions * successfully. If the pending anti compaction fails, a failure message is sent to the coordinator, * cancelling the session. */ - public void handlePrepareMessage(InetAddress from, PrepareConsistentRequest request) + public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request) { logger.trace("received {} from {}", request, from); UUID sessionID = request.parentSession; - InetAddress coordinator = request.coordinator; - Set<InetAddress> peers = request.participants; + InetAddressAndPort coordinator = request.coordinator; + Set<InetAddressAndPort> peers = request.participants; ActiveRepairService.ParentRepairSession parentSession; try @@ -568,7 +594,7 @@ public class LocalSessions { logger.debug("Prepare phase for incremental repair session {} completed", sessionID); setStateAndSave(session, PREPARED); - sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), true)); + sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true)); executor.shutdown(); } @@ -587,7 +613,7 @@ public class LocalSessions { logger.error("Prepare phase for incremental repair session {} failed", sessionID, t); } - sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), false)); + sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false)); failSession(sessionID, false); executor.shutdown(); } @@ -604,7 +630,7 @@ public class LocalSessions } } - public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose propose) + public void handleFinalizeProposeMessage(InetAddressAndPort from, FinalizePropose propose) { logger.trace("received {} from {}", propose, from); UUID sessionID = propose.sessionID; @@ -629,7 +655,7 @@ public class LocalSessions */ syncTable(); - sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddress(), true)); + sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true)); logger.debug("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID); } catch (IllegalArgumentException e) @@ -659,7 +685,7 @@ public class LocalSessions * as part of the compaction process, and avoids having to worry about in progress compactions interfering with the * promotion. */ - public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit commit) + public void handleFinalizeCommitMessage(InetAddressAndPort from, FinalizeCommit commit) { logger.trace("received {} from {}", commit, from); UUID sessionID = commit.sessionID; @@ -674,7 +700,7 @@ public class LocalSessions logger.info("Finalized local repair session {}", sessionID); } - public void handleFailSessionMessage(InetAddress from, FailSession msg) + public void handleFailSessionMessage(InetAddressAndPort from, FailSession msg) { logger.trace("received {} from {}", msg, from); failSession(msg.sessionID, false); @@ -684,16 +710,16 @@ public class LocalSessions { logger.debug("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID); StatusRequest request = new StatusRequest(session.sessionID); - for (InetAddress participant : session.participants) + for (InetAddressAndPort participant : session.participants) { - if (!getBroadcastAddress().equals(participant) && isAlive(participant)) + if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant)) { sendMessage(participant, request); } } } - public void handleStatusRequest(InetAddress from, StatusRequest request) + public void handleStatusRequest(InetAddressAndPort from, StatusRequest request) { logger.trace("received {} from {}", request, from); UUID sessionID = request.sessionID; @@ -710,7 +736,7 @@ public class LocalSessions } } - public void handleStatusResponse(InetAddress from, StatusResponse response) + public void handleStatusResponse(InetAddressAndPort from, StatusResponse response) { logger.trace("received {} from {}", response, from); UUID sessionID = response.sessionID; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java index 015b558..156fde7 100644 --- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java +++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.consistent; -import java.net.InetAddress; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -27,6 +26,7 @@ import java.util.Objects; import com.google.common.collect.Lists; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.RepairResult; import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.SyncStat; @@ -42,14 +42,14 @@ public class SyncStatSummary private static class Session { - final InetAddress src; - final InetAddress dst; + final InetAddressAndPort src; + final InetAddressAndPort dst; int files = 0; long bytes = 0; long ranges = 0; - Session(InetAddress src, InetAddress dst) + Session(InetAddressAndPort src, InetAddressAndPort dst) { this.src = src; this.dst = dst; @@ -84,7 +84,7 @@ public class SyncStatSummary int ranges = -1; boolean totalsCalculated = false; - final Map<Pair<InetAddress, InetAddress>, Session> sessions = new HashMap<>(); + final Map<Pair<InetAddressAndPort, InetAddressAndPort>, Session> sessions = new HashMap<>(); Table(String keyspace, String table) { @@ -92,9 +92,9 @@ public class SyncStatSummary this.table = table; } - Session getOrCreate(InetAddress from, InetAddress to) + Session getOrCreate(InetAddressAndPort from, InetAddressAndPort to) { - Pair<InetAddress, InetAddress> k = Pair.create(from, to); + Pair<InetAddressAndPort, InetAddressAndPort> k = Pair.create(from, to); if (!sessions.containsKey(k)) { sessions.put(k, new Session(from, to)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java index b75ad7f..6d76269 100644 --- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairJobDesc; @@ -39,13 +40,13 @@ public class AsymmetricSyncRequest extends RepairMessage { public static MessageSerializer serializer = new SyncRequestSerializer(); - public final InetAddress initiator; - public final InetAddress fetchingNode; - public final InetAddress fetchFrom; + public final InetAddressAndPort initiator; + public final InetAddressAndPort fetchingNode; + public final InetAddressAndPort fetchFrom; public final Collection<Range<Token>> ranges; public final PreviewKind previewKind; - public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind) + public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind) { super(Type.ASYMMETRIC_SYNC_REQUEST, desc); this.initiator = initiator; @@ -80,9 +81,9 @@ public class AsymmetricSyncRequest extends RepairMessage public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException { RepairJobDesc.serializer.serialize(message.desc, out, version); - CompactEndpointSerializationHelper.serialize(message.initiator, out); - CompactEndpointSerializationHelper.serialize(message.fetchingNode, out); - CompactEndpointSerializationHelper.serialize(message.fetchFrom, out); + CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version); + CompactEndpointSerializationHelper.instance.serialize(message.fetchingNode, out, version); + CompactEndpointSerializationHelper.instance.serialize(message.fetchFrom, out, version); out.writeInt(message.ranges.size()); for (Range<Token> range : message.ranges) { @@ -95,9 +96,9 @@ public class AsymmetricSyncRequest extends RepairMessage public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); - InetAddress owner = CompactEndpointSerializationHelper.deserialize(in); - InetAddress src = CompactEndpointSerializationHelper.deserialize(in); - InetAddress dst = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version); int rangesCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangesCount); for (int i = 0; i < rangesCount; ++i) @@ -109,7 +110,9 @@ public class AsymmetricSyncRequest extends RepairMessage public long serializedSize(AsymmetricSyncRequest message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator); + size += CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version); + size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchingNode, version); + size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchFrom, version); size += TypeSizes.sizeof(message.ranges.size()); for (Range<Token> range : message.ranges) size += AbstractBounds.tokenSerializer.serializedSize(range, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java index 6c28347..449748a 100644 --- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java +++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java @@ -19,24 +19,22 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.InetAddressSerializer; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.utils.UUIDSerializer; public class FinalizePromise extends RepairMessage { public final UUID sessionID; - public final InetAddress participant; + public final InetAddressAndPort participant; public final boolean promised; - public FinalizePromise(UUID sessionID, InetAddress participant, boolean promised) + public FinalizePromise(UUID sessionID, InetAddressAndPort participant, boolean promised) { super(Type.FINALIZE_PROMISE, null); assert sessionID != null; @@ -68,26 +66,24 @@ public class FinalizePromise extends RepairMessage public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>() { - private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; - public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException { UUIDSerializer.serializer.serialize(msg.sessionID, out, version); - ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), out); + CompactEndpointSerializationHelper.instance.serialize(msg.participant, out, version); out.writeBoolean(msg.promised); } public FinalizePromise deserialize(DataInputPlus in, int version) throws IOException { return new FinalizePromise(UUIDSerializer.serializer.deserialize(in, version), - inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)), + CompactEndpointSerializationHelper.instance.deserialize(in, version), in.readBoolean()); } public long serializedSize(FinalizePromise msg, int version) { long size = UUIDSerializer.serializer.serializedSize(msg.sessionID, version); - size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant)); + size += CompactEndpointSerializationHelper.instance.serializedSize(msg.participant, version); size += TypeSizes.sizeof(msg.promised); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java index 57056ef..9aae256 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -29,18 +28,17 @@ import com.google.common.collect.ImmutableSet; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.InetAddressSerializer; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.utils.UUIDSerializer; public class PrepareConsistentRequest extends RepairMessage { public final UUID parentSession; - public final InetAddress coordinator; - public final Set<InetAddress> participants; + public final InetAddressAndPort coordinator; + public final Set<InetAddressAndPort> participants; - public PrepareConsistentRequest(UUID parentSession, InetAddress coordinator, Set<InetAddress> participants) + public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> participants) { super(Type.CONSISTENT_REQUEST, null); assert parentSession != null; @@ -82,28 +80,27 @@ public class PrepareConsistentRequest extends RepairMessage public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>() { - private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException { UUIDSerializer.serializer.serialize(request.parentSession, out, version); - ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator), out); + CompactEndpointSerializationHelper.instance.serialize(request.coordinator, out, version); out.writeInt(request.participants.size()); - for (InetAddress peer : request.participants) + for (InetAddressAndPort peer : request.participants) { - ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out); + CompactEndpointSerializationHelper.instance.serialize(peer, out, version); } } public PrepareConsistentRequest deserialize(DataInputPlus in, int version) throws IOException { UUID sessionId = UUIDSerializer.serializer.deserialize(in, version); - InetAddress coordinator = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)); + InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version); int numPeers = in.readInt(); - Set<InetAddress> peers = new HashSet<>(numPeers); + Set<InetAddressAndPort> peers = new HashSet<>(numPeers); for (int i = 0; i < numPeers; i++) { - InetAddress peer = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)); + InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version); peers.add(peer); } return new PrepareConsistentRequest(sessionId, coordinator, peers); @@ -112,11 +109,11 @@ public class PrepareConsistentRequest extends RepairMessage public long serializedSize(PrepareConsistentRequest request, int version) { long size = UUIDSerializer.serializer.serializedSize(request.parentSession, version); - size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator)); + size += CompactEndpointSerializationHelper.instance.serializedSize(request.coordinator, version); size += TypeSizes.sizeof(request.participants.size()); - for (InetAddress peer : request.participants) + for (InetAddressAndPort peer : request.participants) { - size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer)); + size += CompactEndpointSerializationHelper.instance.serializedSize(peer, version); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java index cf4410a..630f18e 100644 --- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java +++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java @@ -19,24 +19,22 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.UUID; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.InetAddressSerializer; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.utils.UUIDSerializer; public class PrepareConsistentResponse extends RepairMessage { public final UUID parentSession; - public final InetAddress participant; + public final InetAddressAndPort participant; public final boolean success; - public PrepareConsistentResponse(UUID parentSession, InetAddress participant, boolean success) + public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort participant, boolean success) { super(Type.CONSISTENT_RESPONSE, null); assert parentSession != null; @@ -68,25 +66,24 @@ public class PrepareConsistentResponse extends RepairMessage public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>() { - private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance; public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException { UUIDSerializer.serializer.serialize(response.parentSession, out, version); - ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant), out); + CompactEndpointSerializationHelper.instance.serialize(response.participant, out, version); out.writeBoolean(response.success); } public PrepareConsistentResponse deserialize(DataInputPlus in, int version) throws IOException { return new PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version), - inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)), + CompactEndpointSerializationHelper.instance.deserialize(in, version), in.readBoolean()); } public long serializedSize(PrepareConsistentResponse response, int version) { long size = UUIDSerializer.serializer.serializedSize(response.parentSession, version); - size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant)); + size += CompactEndpointSerializationHelper.instance.serializedSize(response.participant, version); size += TypeSizes.sizeof(response.success); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java index 7b68daf..1f1344d 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -26,6 +25,7 @@ import java.util.Objects; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.NodePair; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.streaming.SessionSummary; @@ -53,7 +53,7 @@ public class SyncComplete extends RepairMessage this.summaries = summaries; } - public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries) + public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries) { super(Type.SYNC_COMPLETE, desc); this.summaries = summaries; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index 01601e2..a0bf4e2 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -30,6 +29,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairJobDesc; @@ -45,14 +45,14 @@ public class SyncRequest extends RepairMessage { public static MessageSerializer serializer = new SyncRequestSerializer(); - public final InetAddress initiator; - public final InetAddress src; - public final InetAddress dst; + public final InetAddressAndPort initiator; + public final InetAddressAndPort src; + public final InetAddressAndPort dst; public final Collection<Range<Token>> ranges; public final PreviewKind previewKind; - public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind) - { + public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind) + { super(Type.SYNC_REQUEST, desc); this.initiator = initiator; this.src = src; @@ -87,9 +87,9 @@ public class SyncRequest extends RepairMessage public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException { RepairJobDesc.serializer.serialize(message.desc, out, version); - CompactEndpointSerializationHelper.serialize(message.initiator, out); - CompactEndpointSerializationHelper.serialize(message.src, out); - CompactEndpointSerializationHelper.serialize(message.dst, out); + CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version); + CompactEndpointSerializationHelper.instance.serialize(message.src, out, version); + CompactEndpointSerializationHelper.instance.serialize(message.dst, out, version); out.writeInt(message.ranges.size()); for (Range<Token> range : message.ranges) { @@ -102,9 +102,9 @@ public class SyncRequest extends RepairMessage public SyncRequest deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); - InetAddress owner = CompactEndpointSerializationHelper.deserialize(in); - InetAddress src = CompactEndpointSerializationHelper.deserialize(in); - InetAddress dst = CompactEndpointSerializationHelper.deserialize(in); + InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version); int rangesCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangesCount); for (int i = 0; i < rangesCount; ++i) @@ -116,7 +116,7 @@ public class SyncRequest extends RepairMessage public long serializedSize(SyncRequest message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator); + size += 3 * CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version); size += TypeSizes.sizeof(message.ranges.size()); for (Range<Token> range : message.ranges) size += AbstractBounds.tokenSerializer.serializedSize(range, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index ef19c25..c8881e5 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -18,7 +18,6 @@ package org.apache.cassandra.schema; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; import java.lang.management.ManagementFactory; @@ -39,6 +38,7 @@ import org.apache.cassandra.gms.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -59,10 +59,10 @@ public class MigrationManager private MigrationManager() {} - public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) + public static void scheduleSchemaPull(InetAddressAndPort endpoint, EndpointState state) { UUID schemaVersion = state.getSchemaVersion(); - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null) + if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && schemaVersion != null) maybeScheduleSchemaPull(schemaVersion, endpoint); } @@ -70,7 +70,7 @@ public class MigrationManager * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) + private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint) { if (Schema.instance.getVersion() == null) { @@ -130,7 +130,7 @@ public class MigrationManager } } - private static Future<?> submitMigrationTask(InetAddress endpoint) + private static Future<?> submitMigrationTask(InetAddressAndPort endpoint) { /* * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are @@ -139,7 +139,7 @@ public class MigrationManager return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); } - static boolean shouldPullSchemaFrom(InetAddress endpoint) + static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint) { /* * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) @@ -427,7 +427,7 @@ public class MigrationManager FBUtilities.waitOnFuture(announce(mutations)); } - private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema) + private static void pushSchemaMutation(InetAddressAndPort endpoint, Collection<Mutation> schema) { MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE, schema, @@ -446,10 +446,10 @@ public class MigrationManager } }); - for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) + for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) { // only push schema to nodes with known and equal versions - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) pushSchemaMutation(endpoint, schema); @@ -486,11 +486,11 @@ public class MigrationManager Schema.instance.clear(); - Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); - liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + Set<InetAddressAndPort> liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort()); // force migration if there are nodes around - for (InetAddress node : liveEndpoints) + for (InetAddressAndPort node : liveEndpoints) { if (shouldPullSchemaFrom(node)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java index 73e396d..6ff206a 100644 --- a/src/java/org/apache/cassandra/schema/MigrationTask.java +++ b/src/java/org/apache/cassandra/schema/MigrationTask.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.schema; -import java.net.InetAddress; import java.util.Collection; import java.util.EnumSet; import java.util.Set; @@ -32,6 +31,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.SystemKeyspace.BootstrapState; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -46,9 +46,9 @@ final class MigrationTask extends WrappedRunnable private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); - private final InetAddress endpoint; + private final InetAddressAndPort endpoint; - MigrationTask(InetAddress endpoint) + MigrationTask(InetAddressAndPort endpoint) { this.endpoint = endpoint; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 7ff0b9b..e06131e 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collection; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -38,6 +37,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -60,12 +60,12 @@ public abstract class AbstractReadExecutor private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class); protected final ReadCommand command; - protected final List<InetAddress> targetReplicas; + protected final List<InetAddressAndPort> targetReplicas; protected final ReadCallback handler; protected final TraceState traceState; protected final ColumnFamilyStore cfs; - AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime) + AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) { this.command = command; this.targetReplicas = targetReplicas; @@ -78,27 +78,27 @@ public abstract class AbstractReadExecutor // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once // we stop being compatible with pre-3.0 nodes. int digestVersion = MessagingService.current_version; - for (InetAddress replica : targetReplicas) + for (InetAddressAndPort replica : targetReplicas) digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica)); command.setDigestVersion(digestVersion); } - protected void makeDataRequests(Iterable<InetAddress> endpoints) + protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints) { makeRequests(command, endpoints); } - protected void makeDigestRequests(Iterable<InetAddress> endpoints) + protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints) { makeRequests(command.copyAsDigestQuery(), endpoints); } - private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints) + private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints) { boolean hasLocalEndpoint = false; - for (InetAddress endpoint : endpoints) + for (InetAddressAndPort endpoint : endpoints) { if (StorageProxy.canDoLocalRequest(endpoint)) { @@ -132,7 +132,7 @@ public abstract class AbstractReadExecutor * * @return target replicas + the extra replica, *IF* we speculated. */ - public abstract Collection<InetAddress> getContactedReplicas(); + public abstract Collection<InetAddressAndPort> getContactedReplicas(); /** * send the initial set of requests @@ -184,12 +184,12 @@ public abstract class AbstractReadExecutor public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException { Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); + List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM ? ReadRepairDecision.NONE : newReadRepairDecision(command.metadata()); - List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); + List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); // Throw UAE early if we don't have enough replicas. consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); @@ -223,12 +223,12 @@ public abstract class AbstractReadExecutor } // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. - InetAddress extraReplica = allReplicas.get(targetReplicas.size()); + InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size()); // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so // we might have to find a replacement that's not already in targetReplicas. if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica)) { - for (InetAddress address : allReplicas) + for (InetAddressAndPort address : allReplicas) { if (!targetReplicas.contains(address)) { @@ -269,7 +269,7 @@ public abstract class AbstractReadExecutor */ private final boolean logFailedSpeculation; - public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation) + public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation) { super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); this.logFailedSpeculation = logFailedSpeculation; @@ -290,7 +290,7 @@ public abstract class AbstractReadExecutor } } - public Collection<InetAddress> getContactedReplicas() + public Collection<InetAddressAndPort> getContactedReplicas() { return targetReplicas; } @@ -304,7 +304,7 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, - List<InetAddress> targetReplicas, + List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) { super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); @@ -314,7 +314,7 @@ public abstract class AbstractReadExecutor { // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know // that the last replica in our list is "extra." - List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1); + List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1); if (handler.blockfor < initialReplicas.size()) { @@ -347,7 +347,7 @@ public abstract class AbstractReadExecutor if (handler.resolver.isDataPresent()) retryCommand = command.copyAsDigestQuery(); - InetAddress extraReplica = Iterables.getLast(targetReplicas); + InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas); if (traceState != null) traceState.trace("speculating read retry on {}", extraReplica); logger.trace("speculating read retry on {}", extraReplica); @@ -355,7 +355,7 @@ public abstract class AbstractReadExecutor } } - public Collection<InetAddress> getContactedReplicas() + public Collection<InetAddressAndPort> getContactedReplicas() { return speculated ? targetReplicas @@ -378,7 +378,7 @@ public abstract class AbstractReadExecutor ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, - List<InetAddress> targetReplicas, + List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) { super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); @@ -389,7 +389,7 @@ public abstract class AbstractReadExecutor // no-op } - public Collection<InetAddress> getContactedReplicas() + public Collection<InetAddressAndPort> getContactedReplicas() { return targetReplicas; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index b5eaadb..9d800a0 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,6 +34,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.utils.concurrent.SimpleCondition; @@ -47,15 +47,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private AtomicInteger responsesAndExpirations; private final SimpleCondition condition = new SimpleCondition(); protected final Keyspace keyspace; - protected final Collection<InetAddress> naturalEndpoints; + protected final Collection<InetAddressAndPort> naturalEndpoints; public final ConsistencyLevel consistencyLevel; protected final Runnable callback; - protected final Collection<InetAddress> pendingEndpoints; + protected final Collection<InetAddressAndPort> pendingEndpoints; protected final WriteType writeType; private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; - private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; + private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint; private final long queryStartNanoTime; private volatile boolean supportsBackPressure = true; @@ -72,8 +72,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW * @param queryStartNanoTime */ protected AbstractWriteResponseHandler(Keyspace keyspace, - Collection<InetAddress> naturalEndpoints, - Collection<InetAddress> pendingEndpoints, + Collection<InetAddressAndPort> naturalEndpoints, + Collection<InetAddressAndPort> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable callback, WriteType writeType, @@ -208,7 +208,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW /** * @return true if the message counts towards the totalBlockFor() threshold */ - protected boolean waitingFor(InetAddress from) + protected boolean waitingFor(InetAddressAndPort from) { return true; } @@ -236,7 +236,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW } @Override - public void onFailure(InetAddress from, RequestFailureReason failureReason) + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { logger.trace("Got failure from {}", from); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
