Repository: cassandra Updated Branches: refs/heads/trunk c5a7fcaa8 -> eda2613bc
Fix some regressions caused by 14058 Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-14353 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eda2613b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eda2613b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eda2613b Branch: refs/heads/trunk Commit: eda2613bcca7e4734b3bb4e04c6a4df39eb59ca2 Parents: c5a7fca Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Mar 28 15:03:52 2018 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Apr 6 16:20:47 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/reads/DataResolver.java | 114 +++++++++++++++++-- .../reads/repair/BlockingReadRepair.java | 68 +++++++++-- .../service/reads/repair/NoopReadRepair.java | 1 - .../repair/PartitionIteratorMergeListener.java | 5 - .../service/reads/repair/ReadRepair.java | 1 + .../reads/repair/RowIteratorMergeListener.java | 30 ----- .../reads/repair/TestableReadRepair.java | 1 - 8 files changed, 165 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3db7f27..d6cf162 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Fix some regressions caused by 14058 (CASSANDRA-14353) * Abstract repair for pluggable storage (CASSANDRA-14116) * Add meaningful toString() impls (CASSANDRA-13653) * Add sstableloader option to accept target keyspace name (CASSANDRA-13884) http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java index 11dd083..4c7a6c9 100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@ -17,12 +17,17 @@ */ package org.apache.cassandra.service.reads; -import java.net.InetAddress; import java.util.*; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.reads.repair.ReadRepair; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; @@ -33,11 +38,6 @@ import org.apache.cassandra.tracing.TraceState; public class DataResolver extends ResponseResolver { - private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = - Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); - - @VisibleForTesting - final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); private final long queryStartNanoTime; private final boolean enforceStrictLiveness; @@ -114,7 +114,7 @@ public class DataResolver extends ResponseResolver results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness)); } - return UnfilteredPartitionIterators.merge(results, command.nowInSec(), readRepair.getMergeListener(sources)); + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), wrapMergeListener(readRepair.getMergeListener(sources), sources)); } public void evaluateAllResponses() @@ -130,4 +130,102 @@ public class DataResolver extends ResponseResolver { evaluateAllResponses(); } + + private String makeResponsesDebugString(DecoratedKey partitionKey) + { + return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); + } + + private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, InetAddressAndPort[] sources) + { + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions); + + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + try + { + rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + mergedDeletion == null ? "null" : mergedDeletion.toString(), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRows(Row merged, Row[] versions) + { + try + { + rowListener.onMergedRows(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + rowListener.onMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + + } + + public void close() + { + rowListener.close(); + } + }; + } + + public void close() + { + partitionListener.close(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 07b6e2c..8689356 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -38,7 +38,10 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; @@ -57,7 +60,6 @@ import org.apache.cassandra.service.reads.ReadCallback; import org.apache.cassandra.service.reads.ResponseResolver; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.concurrent.Accumulator; /** * 'Classic' read repair. Doesn't allow the client read to return until @@ -67,6 +69,9 @@ public class BlockingReadRepair implements ReadRepair, RepairListener { private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class); + private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = + Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); + private final ReadCommand command; private final List<InetAddressAndPort> endpoints; private final long queryStartNanoTime; @@ -110,24 +115,66 @@ public class BlockingReadRepair implements ReadRepair, RepairListener { final List<AsyncOneResponse<?>> responses; + final ReadCommand command; + final ConsistencyLevel consistency; - public BlockingPartitionRepair(int expectedResponses) + public BlockingPartitionRepair(int expectedResponses, ReadCommand command, ConsistencyLevel consistency) { this.responses = new ArrayList<>(expectedResponses); + this.command = command; + this.consistency = consistency; } - protected AsyncOneResponse sendMutation(InetAddressAndPort endpoint, Mutation mutation) + private AsyncOneResponse sendRepairMutation(Mutation mutation, InetAddressAndPort destination) { - // use a separate verb here because we don't want these to be get the white glove hint- - // on-timeout behavior that a "real" mutation gets - Tracing.trace("Sending read-repair-mutation to {}", endpoint); - MessageOut<Mutation> msg = mutation.createMessage(MessagingService.Verb.READ_REPAIR); - return MessagingService.instance().sendRR(msg, endpoint); + DecoratedKey key = mutation.key(); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + int messagingVersion = MessagingService.instance().getVersion(destination); + + int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); + int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); + + AsyncOneResponse callback = null; + + if (mutationSize <= maxMutationSize) + { + Tracing.trace("Sending read-repair-mutation to {}", destination); + // use a separate verb here to avoid writing hints on timeouts + MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); + callback = MessagingService.instance().sendRR(message, destination); + ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark(); + } + else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) + { + logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata(), + command.metadata().partitionKeyType.getString(key.getKey()), + destination); + } + else + { + logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata(), + command.metadata().partitionKeyType.getString(key.getKey()), + destination); + + int blockFor = consistency.blockFor(keyspace); + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + return callback; } public void reportMutation(InetAddressAndPort endpoint, Mutation mutation) { - responses.add(sendMutation(endpoint, mutation)); + AsyncOneResponse<?> response = sendRepairMutation(mutation, endpoint); + + if (response != null) + responses.add(response); } public void finish() @@ -169,12 +216,11 @@ public class BlockingReadRepair implements ReadRepair, RepairListener { throw new RuntimeException(e); } - } public PartitionRepair startPartitionRepair() { - BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size()); + BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size(), command, consistency); repairs.add(repair); return repair; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index ff65dbb..39f5bff 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.reads.repair; -import java.net.InetAddress; import java.util.List; import java.util.function.Consumer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java index 3ad57cf..4ccdcbf 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java @@ -20,9 +20,6 @@ package org.apache.cassandra.service.reads.repair; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Columns; import org.apache.cassandra.db.DecoratedKey; @@ -35,8 +32,6 @@ import org.apache.cassandra.locator.InetAddressAndPort; public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener { - private static final Logger logger = LoggerFactory.getLogger(PartitionIteratorMergeListener.class); - private final InetAddressAndPort[] sources; private final ReadCommand command; private final RepairListener repairListener; http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index bdd730c..21cab20 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -33,6 +33,7 @@ import org.apache.cassandra.tracing.TraceState; public interface ReadRepair { + /** * Used by DataResolver to generate corrections as the partition iterator is consumed */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 44b6eeb..f11d264 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -20,9 +20,6 @@ package org.apache.cassandra.service.reads.repair; import java.util.Arrays; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; - import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringBound; import org.apache.cassandra.db.DecoratedKey; @@ -44,7 +41,6 @@ import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.TableMetadata; public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener { @@ -186,32 +182,6 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) { - try - { - // The code for merging range tombstones is a tad complex and we had the assertions there triggered - // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights - // when that happen without more context that what the assertion errors give us however, hence the - // catch here that basically gather as much as context as reasonable. - internalOnMergedRangeTombstoneMarkers(merged, versions); - } - catch (AssertionError e) - { - // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd - // rather get more info to debug than not. - TableMetadata table = command.metadata(); - String details = String.format("Error merging RTs on %s: command=%s, reversed=%b, merged=%s, versions=%s, sources={%s}", - table, - command.toCQLString(), - isReversed, - merged == null ? "null" : merged.toString(table), - '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', - Arrays.toString(sources)); - throw new AssertionError(details, e); - } - } - - private void internalOnMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { // The current deletion as of dealing with this marker. DeletionTime currentDeletion = currentDeletion(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/eda2613b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java index d125c9d..522e524 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service.reads.repair; -import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org