http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java new file mode 100644 index 0000000..7eedab7 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +/** + * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica. + */ +public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler +{ + private final TraceState state = Tracing.instance.get(); + + private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class); + + private final boolean remoteIsTransient; + private final UUID pendingRepair; + private final boolean pullRepair; + + public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) + { + super(desc, r1, r2, previewKind); + this.remoteIsTransient = remoteIsTransient; + this.pendingRepair = pendingRepair; + this.pullRepair = pullRepair; + } + + @VisibleForTesting + StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences) + { + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) + .listeners(this) + .flushBeforeTransfer(pendingRepair == null) + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node + + if (!pullRepair && !remoteIsTransient) + { + // send ranges to the remote node if we are not performing a pull repair + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); + } + + return plan; + } + + /** + * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback + * that will be called out of band once the streams complete. + */ + @Override + protected void startSync(List<Range<Token>> differences) + { + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); + // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding + InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; + + String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + Tracing.traceRepair(message); + + createStreamPlan(dst, differences).execute(); + } + + public void handleStreamEvent(StreamEvent event) + { + if (state == null) + return; + switch (event.eventType) + { + case STREAM_PREPARED: + StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; + state.trace("Streaming session with {} prepared", spe.session.peer); + break; + case STREAM_COMPLETE: + StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; + state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); + break; + case FILE_PROGRESS: + ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; + state.trace("{}/{} ({}%) {} idx:{}{}", + new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), + FBUtilities.prettyPrintMemory(pi.totalBytes), + pi.currentBytes * 100 / pi.totalBytes, + pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", + pi.sessionIndex, + pi.peer }); + } + } + + public void onSuccess(StreamState result) + { + String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + Tracing.traceRepair(message); + set(stat.withSummaries(result.createSummaries())); + finished(); + } + + public void onFailure(Throwable t) + { + setException(t); + finished(); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java new file mode 100644 index 0000000..1f2740f --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.AsymmetricSyncRequest; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +/** + * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node + * to repair(stream) data with other replica. + * + * When SymmetricRemoteSyncTask receives SyncComplete from remote node, task completes. + */ +public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements CompletableRemoteSyncTask +{ + private static final Logger logger = LoggerFactory.getLogger(SymmetricRemoteSyncTask.class); + + public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) + { + super(desc, r1, r2, previewKind); + } + + void sendRequest(RepairMessage request, InetAddressAndPort to) + { + MessagingService.instance().sendOneWay(request.createMessage(), to); + } + + @Override + protected void startSync(List<Range<Token>> differences) + { + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); + + SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); + String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + Tracing.traceRepair(message); + sendRequest(request, request.src); + } + + public void syncComplete(boolean success, List<SessionSummary> summaries) + { + if (success) + { + set(stat.withSummaries(summaries)); + } + else + { + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + } + finished(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java new file mode 100644 index 0000000..3da2293 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.MerkleTrees; + +/** + * SymmetricSyncTask will calculate the difference of MerkleTree between two nodes + * and perform necessary operation to repair replica. + */ +public abstract class SymmetricSyncTask extends AbstractSyncTask +{ + private static Logger logger = LoggerFactory.getLogger(SymmetricSyncTask.class); + + protected final RepairJobDesc desc; + protected final TreeResponse r1; + protected final TreeResponse r2; + protected final PreviewKind previewKind; + + protected volatile SyncStat stat; + protected long startTime = Long.MIN_VALUE; + + public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) + { + this.desc = desc; + this.r1 = r1; + this.r2 = r2; + this.previewKind = previewKind; + } + + /** + * Compares trees, and triggers repairs for any ranges that mismatch. + */ + public void run() + { + startTime = System.currentTimeMillis(); + // compare trees, and collect differences + List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); + + stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); + + // choose a repair method based on the significance of the difference + String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); + if (differences.isEmpty()) + { + logger.info(String.format(format, "are consistent")); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); + set(stat); + return; + } + + // non-0 difference: perform streaming repair + logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); + startSync(differences); + } + + public SyncStat getCurrentStat() + { + return stat; + } + + protected void finished() + { + if (startTime != Long.MIN_VALUE) + Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java deleted file mode 100644 index f7cf5f1..0000000 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.util.concurrent.AbstractFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.MerkleTrees; - -/** - * SyncTask will calculate the difference of MerkleTree between two nodes - * and perform necessary operation to repair replica. - */ -public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable -{ - private static Logger logger = LoggerFactory.getLogger(SyncTask.class); - - protected final RepairJobDesc desc; - protected final TreeResponse r1; - protected final TreeResponse r2; - protected final PreviewKind previewKind; - - protected volatile SyncStat stat; - protected long startTime = Long.MIN_VALUE; - - public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) - { - this.desc = desc; - this.r1 = r1; - this.r2 = r2; - this.previewKind = previewKind; - } - - /** - * Compares trees, and triggers repairs for any ranges that mismatch. - */ - public void run() - { - startTime = System.currentTimeMillis(); - // compare trees, and collect differences - List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); - - stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); - - // choose a repair method based on the significance of the difference - String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); - if (differences.isEmpty()) - { - logger.info(String.format(format, "are consistent")); - Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); - set(stat); - return; - } - - // non-0 difference: perform streaming repair - logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); - startSync(differences); - } - - public SyncStat getCurrentStat() - { - return stat; - } - - protected void finished() - { - if (startTime != Long.MIN_VALUE) - Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); - } - - protected abstract void startSync(List<Range<Token>> differences); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index a85a1e5..fc09e71 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -185,13 +185,13 @@ public final class SystemDistributedKeyspace processSilent(fmtQuery); } - public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddressAndPort> endpoints) + public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange) { InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); Set<String> participants = Sets.newHashSet(); Set<String> participants_v2 = Sets.newHashSet(); - for (InetAddressAndPort endpoint : endpoints) + for (InetAddressAndPort endpoint : commonRange.endpoints) { participants.add(endpoint.getHostAddress(false)); participants_v2.add(endpoint.toString()); @@ -203,7 +203,7 @@ public final class SystemDistributedKeyspace for (String cfname : cfnames) { - for (Range<Token> range : ranges) + for (Range<Token> range : commonRange.ranges) { String fmtQry = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, keyspaceName, http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index ed25166..4089e77 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -47,11 +47,14 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; + +import org.apache.cassandra.locator.RangesAtEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.repair.KeyspaceRepairManager; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.marshal.UTF8Type; @@ -543,9 +546,35 @@ public class LocalSessions } @VisibleForTesting - ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor) + ListenableFuture prepareSession(KeyspaceRepairManager repairManager, + UUID sessionID, + Collection<ColumnFamilyStore> tables, + RangesAtEndpoint tokenRanges, + ExecutorService executor) + { + return repairManager.prepareIncrementalRepair(sessionID, tables, tokenRanges, executor); + } + + RangesAtEndpoint filterLocalRanges(String keyspace, Set<Range<Token>> ranges) { - return repairManager.prepareIncrementalRepair(sessionID, tables, ranges, executor); + RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace); + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(localRanges.endpoint()); + for (Range<Token> range : ranges) + { + for (Replica replica : localRanges) + { + if (replica.range().equals(range)) + { + builder.add(replica); + } + else if (replica.contains(range)) + { + builder.add(replica.decorateSubrange(range)); + } + } + + } + return builder.build(); } /** @@ -582,7 +611,8 @@ public class LocalSessions ExecutorService executor = Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size()); KeyspaceRepairManager repairManager = parentSession.getKeyspace().getRepairManager(); - ListenableFuture repairPreparation = prepareSession(repairManager, sessionID, parentSession.getColumnFamilyStores(), parentSession.getRanges(), executor); + RangesAtEndpoint tokenRanges = filterLocalRanges(parentSession.getKeyspace().getName(), parentSession.getRanges()); + ListenableFuture repairPreparation = prepareSession(repairManager, sessionID, parentSession.getColumnFamilyStores(), tokenRanges, executor); Futures.addCallback(repairPreparation, new FutureCallback<Object>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/KeyspaceParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java index 68ac5e4..cc46474 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java @@ -74,6 +74,11 @@ public final class KeyspaceParams return new KeyspaceParams(true, ReplicationParams.simple(replicationFactor)); } + public static KeyspaceParams simple(String replicationFactor) + { + return new KeyspaceParams(true, ReplicationParams.simple(replicationFactor)); + } + public static KeyspaceParams simpleTransient(int replicationFactor) { return new KeyspaceParams(false, ReplicationParams.simple(replicationFactor)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/ReplicationParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/ReplicationParams.java b/src/java/org/apache/cassandra/schema/ReplicationParams.java index 21c029e..21e19d6 100644 --- a/src/java/org/apache/cassandra/schema/ReplicationParams.java +++ b/src/java/org/apache/cassandra/schema/ReplicationParams.java @@ -51,6 +51,11 @@ public final class ReplicationParams return new ReplicationParams(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor))); } + static ReplicationParams simple(String replicationFactor) + { + return new ReplicationParams(SimpleStrategy.class, ImmutableMap.of("replication_factor", replicationFactor)); + } + static ReplicationParams nts(Object... args) { assert args.length % 2 == 0; @@ -58,9 +63,7 @@ public final class ReplicationParams Map<String, String> options = new HashMap<>(); for (int i = 0; i < args.length; i += 2) { - String dc = (String) args[i]; - Integer rf = (Integer) args[i + 1]; - options.put(dc, rf.toString()); + options.put((String) args[i], args[i + 1].toString()); } return new ReplicationParams(NetworkTopologyStrategy.class, options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index a88aebb..22a8c39 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -137,6 +137,7 @@ public final class SchemaKeyspace + "min_index_interval int," + "read_repair_chance double," // no longer used, left for drivers' sake + "speculative_retry text," + + "speculative_write_threshold text," + "cdc boolean," + "read_repair text," + "PRIMARY KEY ((keyspace_name), table_name))"); @@ -203,6 +204,7 @@ public final class SchemaKeyspace + "min_index_interval int," + "read_repair_chance double," // no longer used, left for drivers' sake + "speculative_retry text," + + "speculative_write_threshold text," + "cdc boolean," + "read_repair text," + "PRIMARY KEY ((keyspace_name), view_name))"); @@ -563,6 +565,7 @@ public final class SchemaKeyspace .add("min_index_interval", params.minIndexInterval) .add("read_repair_chance", 0.0) // no longer used, left for drivers' sake .add("speculative_retry", params.speculativeRetry.toString()) + .add("speculative_write_threshold", params.speculativeWriteThreshold.toString()) .add("crc_check_chance", params.crcCheckChance) .add("caching", params.caching.asMap()) .add("compaction", params.compaction.asMap()) @@ -991,6 +994,7 @@ public final class SchemaKeyspace .minIndexInterval(row.getInt("min_index_interval")) .crcCheckChance(row.getDouble("crc_check_chance")) .speculativeRetry(SpeculativeRetryPolicy.fromString(row.getString("speculative_retry"))) + .speculativeWriteThreshold(SpeculativeRetryPolicy.fromString(row.getString("speculative_write_threshold"))) .cdc(row.has("cdc") && row.getBoolean("cdc")) .readRepair(getReadRepairStrategy(row)) .build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/TableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java index 6466e2e..10edf4d 100644 --- a/src/java/org/apache/cassandra/schema/TableMetadata.java +++ b/src/java/org/apache/cassandra/schema/TableMetadata.java @@ -834,6 +834,12 @@ public final class TableMetadata return this; } + public Builder speculativeWriteThreshold(SpeculativeRetryPolicy val) + { + params.speculativeWriteThreshold(val); + return this; + } + public Builder extensions(Map<String, ByteBuffer> val) { params.extensions(val); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/TableParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java index afbf26c..0bba5e1 100644 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -51,6 +51,7 @@ public final class TableParams MEMTABLE_FLUSH_PERIOD_IN_MS, MIN_INDEX_INTERVAL, SPECULATIVE_RETRY, + SPECULATIVE_WRITE_THRESHOLD, CRC_CHECK_CHANCE, CDC, READ_REPAIR; @@ -71,6 +72,7 @@ public final class TableParams public final int minIndexInterval; public final int maxIndexInterval; public final SpeculativeRetryPolicy speculativeRetry; + public final SpeculativeRetryPolicy speculativeWriteThreshold; public final CachingParams caching; public final CompactionParams compaction; public final CompressionParams compression; @@ -91,6 +93,7 @@ public final class TableParams minIndexInterval = builder.minIndexInterval; maxIndexInterval = builder.maxIndexInterval; speculativeRetry = builder.speculativeRetry; + speculativeWriteThreshold = builder.speculativeWriteThreshold; caching = builder.caching; compaction = builder.compaction; compression = builder.compression; @@ -118,6 +121,7 @@ public final class TableParams .memtableFlushPeriodInMs(params.memtableFlushPeriodInMs) .minIndexInterval(params.minIndexInterval) .speculativeRetry(params.speculativeRetry) + .speculativeWriteThreshold(params.speculativeWriteThreshold) .extensions(params.extensions) .cdc(params.cdc) .readRepair(params.readRepair); @@ -260,6 +264,7 @@ public final class TableParams private int minIndexInterval = 128; private int maxIndexInterval = 2048; private SpeculativeRetryPolicy speculativeRetry = PercentileSpeculativeRetryPolicy.NINETY_NINE_P; + private SpeculativeRetryPolicy speculativeWriteThreshold = PercentileSpeculativeRetryPolicy.NINETY_NINE_P; private CachingParams caching = CachingParams.DEFAULT; private CompactionParams compaction = CompactionParams.DEFAULT; private CompressionParams compression = CompressionParams.DEFAULT; @@ -330,6 +335,12 @@ public final class TableParams return this; } + public Builder speculativeWriteThreshold(SpeculativeRetryPolicy val) + { + speculativeWriteThreshold = val; + return this; + } + public Builder caching(CachingParams val) { caching = val; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 9d800a0..e817cc8 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -17,28 +17,35 @@ */ package org.apache.cassandra.service; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.stream.Collectors; -import com.google.common.collect.Iterables; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.ReplicaLayout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.WriteType; -import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.utils.concurrent.SimpleCondition; + public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T> { protected static final Logger logger = LoggerFactory.getLogger(AbstractWriteResponseHandler.class); @@ -46,11 +53,9 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached. private AtomicInteger responsesAndExpirations; private final SimpleCondition condition = new SimpleCondition(); - protected final Keyspace keyspace; - protected final Collection<InetAddressAndPort> naturalEndpoints; - public final ConsistencyLevel consistencyLevel; + protected final ReplicaLayout.ForToken replicaLayout; + protected final Runnable callback; - protected final Collection<InetAddressAndPort> pendingEndpoints; protected final WriteType writeType; private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); @@ -60,7 +65,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private volatile boolean supportsBackPressure = true; /** - * Delegate to another WriteReponseHandler or possibly this one to track if the ideal consistency level was reached. + * Delegate to another WriteResponseHandler or possibly this one to track if the ideal consistency level was reached. * Will be set to null if ideal CL was not configured * Will be set to an AWRH delegate if ideal CL was configured * Will be same as "this" if this AWRH is the ideal consistency level @@ -71,18 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW * @param callback A callback to be called when the write is successful. * @param queryStartNanoTime */ - protected AbstractWriteResponseHandler(Keyspace keyspace, - Collection<InetAddressAndPort> naturalEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistencyLevel, + protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { - this.keyspace = keyspace; - this.pendingEndpoints = pendingEndpoints; - this.consistencyLevel = consistencyLevel; - this.naturalEndpoints = naturalEndpoints; + this.replicaLayout = replicaLayout; this.callback = callback; this.writeType = writeType; this.failureReasonByEndpoint = new ConcurrentHashMap<>(); @@ -112,12 +111,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW // avoid sending confusing info to the user (see CASSANDRA-6491). if (acks >= blockedFor) acks = blockedFor - 1; - throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor); + throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor); } if (totalBlockFor() + failures > totalEndpoints()) { - throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint); + throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint); } } @@ -136,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler) { this.idealCLDelegate = handler; - idealCLDelegate.responsesAndExpirations = new AtomicInteger(naturalEndpoints.size() + pendingEndpoints.size()); + idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size()); } /** @@ -194,15 +193,20 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW { // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) - return consistencyLevel.blockFor(keyspace) + pendingEndpoints.size(); + return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending()); } /** - * @return the total number of endpoints the request has been sent to. + * @return the total number of endpoints the request can been sent to. */ protected int totalEndpoints() { - return naturalEndpoints.size() + pendingEndpoints.size(); + return replicaLayout.all().size(); + } + + public ConsistencyLevel consistencyLevel() + { + return replicaLayout.consistencyLevel(); } /** @@ -225,7 +229,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public void assureSufficientLiveNodes() throws UnavailableException { - consistencyLevel.assureSufficientLiveNodes(keyspace, Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), isAlive)); + replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending()); } protected void signal() @@ -274,12 +278,49 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW //The condition being signaled is a valid proxy for the CL being achieved if (!condition.isSignaled()) { - keyspace.metric.writeFailedIdealCL.inc(); + replicaLayout.keyspace().metric.writeFailedIdealCL.inc(); } else { - keyspace.metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime); + replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime); } } } + + /** + * Cheap Quorum backup. If we failed to reach quorum with our initial (full) nodes, reach out to other nodes. + */ + public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC) + { + if (replicaLayout.all().size() == replicaLayout.selected().size()) + return; + + long timeout = Long.MAX_VALUE; + List<ColumnFamilyStore> cfs = mutation.getTableIds().stream() + .map(Schema.instance::getColumnFamilyStoreInstance) + .collect(Collectors.toList()); + for (ColumnFamilyStore cf : cfs) + timeout = Math.min(timeout, cf.transientWriteLatencyNanos); + + // no latency information, or we're overloaded + if (timeout > TimeUnit.MILLISECONDS.toNanos(mutation.getTimeout())) + return; + + try + { + if (!condition.await(timeout, TimeUnit.NANOSECONDS)) + { + for (ColumnFamilyStore cf : cfs) + cf.metric.speculativeWrites.inc(); + + writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(), + (AbstractWriteResponseHandler<IMutation>) this, + localDC); + } + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index b60088c..9f37095 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -38,6 +38,8 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.cassandra.locator.EndpointsByRange; +import org.apache.cassandra.locator.EndpointsForRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,11 +62,14 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.CommonRange; +import org.apache.cassandra.repair.RepairRunnable; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; @@ -79,7 +84,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; -import static org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.queue; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.transform; /** * ActiveRepairService is the starting point for manual "active" repairs. @@ -204,10 +210,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * @return Future for asynchronous call or null if there is no need to repair */ public RepairSession submitRepairSession(UUID parentRepairSession, - Collection<Range<Token>> range, + CommonRange range, String keyspace, RepairParallelism parallelismDegree, - Set<InetAddressAndPort> endpoints, boolean isIncremental, boolean pullRepair, boolean force, @@ -216,14 +221,15 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai ListeningExecutorService executor, String... cfnames) { - if (endpoints.isEmpty()) + if (range.endpoints.isEmpty()) return null; if (cfnames.length == 0) return null; - - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, + parallelismDegree, isIncremental, pullRepair, force, + previewKind, optimiseStreams, cfnames); sessions.put(session.getId(), session); // register listeners @@ -296,12 +302,12 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * * @return neighbors with whom we share the provided range */ - public static Set<InetAddressAndPort> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, - Range<Token> toRepair, Collection<String> dataCenters, - Collection<String> hosts) + public static EndpointsForRange getNeighbors(String keyspaceName, Iterable<Range<Token>> keyspaceLocalRanges, + Range<Token> toRepair, Collection<String> dataCenters, + Collection<String> hosts) { StorageService ss = StorageService.instance; - Map<Range<Token>, List<InetAddressAndPort>> replicaSets = ss.getRangeToAddressMap(keyspaceName); + EndpointsByRange replicaSets = ss.getRangeToAddressMap(keyspaceName); Range<Token> rangeSuperSet = null; for (Range<Token> range : keyspaceLocalRanges) { @@ -319,23 +325,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) - return Collections.emptySet(); + return EndpointsForRange.empty(toRepair); - Set<InetAddressAndPort> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); - neighbors.remove(FBUtilities.getBroadcastAddressAndPort()); + EndpointsForRange neighbors = replicaSets.get(rangeSuperSet).withoutSelf(); if (dataCenters != null && !dataCenters.isEmpty()) { TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); - Set<InetAddressAndPort> dcEndpoints = Sets.newHashSet(); - Multimap<String,InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints(); - for (String dc : dataCenters) - { - Collection<InetAddressAndPort> c = dcEndpointsMap.get(dc); - if (c != null) - dcEndpoints.addAll(c); - } - return Sets.intersection(neighbors, dcEndpoints); + Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints(); + Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get)); + return neighbors.keep(dcEndpoints); } else if (hosts != null && !hosts.isEmpty()) { @@ -345,7 +344,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai try { final InetAddressAndPort endpoint = InetAddressAndPort.getByName(host.trim()); - if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || neighbors.contains(endpoint)) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || neighbors.endpoints().contains(endpoint)) specifiedHost.add(endpoint); } catch (UnknownHostException e) @@ -366,8 +365,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } specifiedHost.remove(FBUtilities.getBroadcastAddressAndPort()); - return specifiedHost; - + return neighbors.keep(specifiedHost); } return neighbors; @@ -594,10 +592,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public Set<TableId> getTableIds() { - return ImmutableSet.copyOf(Iterables.transform(getColumnFamilyStores(), cfs -> cfs.metadata.id)); + return ImmutableSet.copyOf(transform(getColumnFamilyStores(), cfs -> cfs.metadata.id)); } - public Collection<Range<Token>> getRanges() + public Set<Range<Token>> getRanges() { return ImmutableSet.copyOf(ranges); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java index e373fb6..ee74df5 100644 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime) { - super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType, queryStartNanoTime); + super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime); this.wrapped = wrapped; this.requiredBeforeFinish = requiredBeforeFinish; this.cleanup = cleanup; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index dbd3667..d4cdcc6 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -17,16 +17,15 @@ */ package org.apache.cassandra.service; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.IEndpointSnitch; -import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -41,29 +40,26 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); private final AtomicInteger acks = new AtomicInteger(0); - public DatacenterSyncWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistencyLevel, - Keyspace keyspace, + public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { // Response is been managed by the map so make it 1 for the superclass. - super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime); - assert consistencyLevel == ConsistencyLevel.EACH_QUORUM; + super(replicaLayout, callback, writeType, queryStartNanoTime); + assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM; - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); + NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy(); for (String dc : strategy.getDatacenters()) { - int rf = strategy.getReplicationFactor(dc); + int rf = strategy.getReplicationFactor(dc).allReplicas; responses.put(dc, new AtomicInteger((rf / 2) + 1)); } // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) - for (InetAddressAndPort pending : pendingEndpoints) + for (Replica pending : replicaLayout.pending()) { responses.get(snitch.getDatacenter(pending)).incrementAndGet(); } @@ -105,4 +101,5 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse { return false; } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index a8d7b28..b458a71 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -17,29 +17,23 @@ */ package org.apache.cassandra.service; -import java.util.Collection; - -import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.WriteType; /** * This class blocks for a quorum of responses _in the local datacenter only_ (CL.LOCAL_QUORUM). */ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> { - public DatacenterWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistencyLevel, - Keyspace keyspace, + public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { - super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType, queryStartNanoTime); - assert consistencyLevel.isDatacenterLocal(); + super(replicaLayout, callback, writeType, queryStartNanoTime); + assert replicaLayout.consistencyLevel().isDatacenterLocal(); } @Override @@ -58,16 +52,8 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T> } @Override - protected int totalBlockFor() - { - // during bootstrap, include pending endpoints (only local here) in the count - // or we may fail the consistency level guarantees (see #833, #8058) - return consistencyLevel.blockFor(keyspace) + consistencyLevel.countLocalEndpoints(pendingEndpoints); - } - - @Override protected boolean waitingFor(InetAddressAndPort from) { - return consistencyLevel.isLocal(from); + return replicaLayout.consistencyLevel().isLocal(from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java index e1c0f55..7b6bd58 100644 --- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java +++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java @@ -92,7 +92,7 @@ public class PendingRangeCalculatorService { int jobs = updateJobs.incrementAndGet(); PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, jobs); - executor.submit(new PendingRangeTask(updateJobs)); + executor.execute(new PendingRangeTask(updateJobs)); } public void blockUntilFinished() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org