Add flag to allow dropping oversized read repair mutations patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-13975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4 Branch: refs/heads/cassandra-3.11 Commit: f1e850a492126572efc636a6838cff90333806b9 Parents: f767d35 Author: Aleksey Yeschenko <alek...@yeschenko.com> Authored: Wed Oct 25 20:15:39 2017 +0100 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Mon Nov 13 13:10:28 2017 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/metrics/TableMetrics.java | 2 + .../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++--- 3 files changed, 49 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e3026aa..a3c43fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.16 + * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975) * Fix SSTableLoader logger message (CASSANDRA-14003) * Fix repair race that caused gossip to block (CASSANDRA-13849) * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964) @@ -8,6 +9,7 @@ * Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939) * Deserialise sstable metadata in nodetool verify (CASSANDRA-13922) + 3.0.15 * Improve TRUNCATE performance (CASSANDRA-13909) * Implement short read protection on partition boundaries (CASSANDRA-13595) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index fe88a63..eb56ed9 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -151,6 +151,7 @@ public class TableMetrics public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); + public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; @@ -648,6 +649,7 @@ public class TableMetrics casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); + readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests")); shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests")); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 5fb34c6..f02b565 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities; public class DataResolver extends ResponseResolver { + private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = + Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); + @VisibleForTesting final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); @@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver public void close() { for (int i = 0; i < repairs.length; i++) + if (null != repairs[i]) + sendRepairMutation(repairs[i], sources[i]); + } + + private void sendRepairMutation(PartitionUpdate partition, InetAddress destination) + { + Mutation mutation = new Mutation(partition); + int messagingVersion = MessagingService.instance().getVersion(destination); + + int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); + int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); + + if (mutationSize <= maxMutationSize) { - if (repairs[i] == null) - continue; - - // use a separate verb here because we don't want these to be get the white glove hint- - // on-timeout behavior that a "real" mutation gets - Tracing.trace("Sending read-repair-mutation to {}", sources[i]); - MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR); - repairResults.add(MessagingService.instance().sendRR(msg, sources[i])); + Tracing.trace("Sending read-repair-mutation to {}", destination); + // use a separate verb here to avoid writing hints on timeouts + MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); + repairResults.add(MessagingService.instance().sendRR(message, destination)); + ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark(); + } + else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) + { + logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata().ksName, + command.metadata().cfName, + command.metadata().getKeyValidator().getString(partitionKey.getKey()), + destination); + } + else + { + logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata().ksName, + command.metadata().cfName, + command.metadata().getKeyValidator().getString(partitionKey.getKey()), + destination); + + int blockFor = consistency.blockFor(keyspace); + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org