http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java index 4c7a6c9,0000000..ebf6a6f mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@@ -1,231 -1,0 +1,214 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.util.*; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + ++import org.apache.cassandra.db.*; ++import org.apache.cassandra.db.filter.*; ++import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; ++import org.apache.cassandra.db.transform.*; +import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.net.*; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.ReadRepair; - import org.apache.cassandra.db.*; - import org.apache.cassandra.db.filter.*; - import org.apache.cassandra.db.partitions.*; - import org.apache.cassandra.db.transform.*; - import org.apache.cassandra.net.*; - import org.apache.cassandra.tracing.TraceState; + +public class DataResolver extends ResponseResolver +{ + private final long queryStartNanoTime; + private final boolean enforceStrictLiveness; + + public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime, ReadRepair readRepair) + { + super(keyspace, command, consistency, readRepair, maxResponseCount); + this.queryStartNanoTime = queryStartNanoTime; + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); + } + + public PartitionIterator getData() + { + ReadResponse response = responses.iterator().next().payload; + return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); + } + + public boolean isDataPresent() + { + return !responses.isEmpty(); + } + + public PartitionIterator resolve() + { + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here + // at the beginning of this method), so grab the response count once and use that through the method. + int count = responses.size(); + List<UnfilteredPartitionIterator> iters = new ArrayList<>(count); + InetAddressAndPort[] sources = new InetAddressAndPort[count]; + for (int i = 0; i < count; i++) + { + MessageIn<ReadResponse> msg = responses.get(i); + iters.add(msg.payload.makeIterator(command)); + sources[i] = msg.from; + } + + /* + * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, + * have more rows than the client requested. To make sure that we still conform to the original limit, + * we apply a top-level post-reconciliation counter to the merged partition iterator. + * + * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied + * to the current partition to work. For this reason we have to apply the counter transformation before + * empty partition discard logic kicks in - for it will eagerly consume the iterator. + * + * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions + * + * See CASSANDRA-13747 for more details. + */ + + DataLimits.Counter mergedResultCounter = - command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); ++ command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness); + + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); + FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); + return Transformation.apply(counted, new EmptyPartitionsDiscarder()); + } + + private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, + InetAddressAndPort[] sources, + DataLimits.Counter mergedResultCounter) + { + // If we have only one results, there is no read repair to do and we can't get short reads + if (results.size() == 1) + return results.get(0); + + /* + * So-called short reads stems from nodes returning only a subset of the results they have due to the limit, + * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother. + */ + if (!command.limits().isUnlimited()) + for (int i = 0; i < results.size(); i++) - { + results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness)); - } + + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), wrapMergeListener(readRepair.getMergeListener(sources), sources)); + } + - public void evaluateAllResponses() - { - // We need to fully consume the results to trigger read repairs if appropriate - try (PartitionIterator iterator = resolve()) - { - PartitionIterators.consume(iterator); - } - } - - public void evaluateAllResponses(TraceState traceState) - { - evaluateAllResponses(); - } - + private String makeResponsesDebugString(DecoratedKey partitionKey) + { + return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey))); + } + + private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, InetAddressAndPort[] sources) + { + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions); + + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + try + { + rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + mergedDeletion == null ? "null" : mergedDeletion.toString(), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRows(Row merged, Row[] versions) + { + try + { + rowListener.onMergedRows(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + rowListener.onMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + Arrays.toString(sources), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + + } + + public void close() + { + rowListener.close(); + } + }; + } + + public void close() + { + partitionListener.close(); + } + }; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/DigestResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/DigestResolver.java index 828a65e,0000000..b2eb0c6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@@ -1,93 -1,0 +1,85 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.tracing.TraceState; + +public class DigestResolver extends ResponseResolver +{ + private volatile ReadResponse dataResponse; + + public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount) + { + super(keyspace, command, consistency, readRepair, maxResponseCount); + Preconditions.checkArgument(command instanceof SinglePartitionReadCommand, + "DigestResolver can only be used with SinglePartitionReadCommand commands"); + } + + @Override + public void preprocess(MessageIn<ReadResponse> message) + { + super.preprocess(message); + if (dataResponse == null && !message.payload.isDigestResponse()) + dataResponse = message.payload; + } + + public PartitionIterator getData() + { + assert isDataPresent(); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); + } + + public boolean responsesMatch() + { + long start = System.nanoTime(); + + // validate digests against each other; return false immediately on mismatch. + ByteBuffer digest = null; + for (MessageIn<ReadResponse> message : responses) + { + ReadResponse response = message.payload; + + ByteBuffer newDigest = response.digest(command); + if (digest == null) + digest = newDigest; + else if (!digest.equals(newDigest)) + // rely on the fact that only single partition queries use digests + return false; + } + + if (logger.isTraceEnabled()) + logger.trace("responsesMatch: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + + return true; + } + - public void evaluateAllResponses(TraceState traceState) - { - if (!responsesMatch()) - { - readRepair.backgroundDigestRepair(traceState); - } - } - + public boolean isDataPresent() + { + return dataResponse != null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/ReadCallback.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/ReadCallback.java index 62fdfaa,0000000..a35fc2e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@@ -1,213 -1,0 +1,206 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.SimpleCondition; + +public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse> +{ + protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); + + public final ResponseResolver resolver; + final SimpleCondition condition = new SimpleCondition(); + private final long queryStartNanoTime; + final int blockfor; + final List<InetAddressAndPort> endpoints; + private final ReadCommand command; + private final ConsistencyLevel consistencyLevel; + private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater + = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received"); + private volatile int received = 0; + private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater + = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures"); + private volatile int failures = 0; + private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint; + + private final Keyspace keyspace; // TODO push this into ConsistencyLevel? + + private final ReadRepair readRepair; + + /** + * Constructor when response count has to be calculated and blocked for. + */ + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime, ReadRepair readRepair) + { + this(resolver, + consistencyLevel, + consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)), + command, + Keyspace.open(command.metadata().keyspace), + filteredEndpoints, + queryStartNanoTime, readRepair); + } + + public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ReadRepair readRepair) + { + this.command = command; + this.keyspace = keyspace; + this.blockfor = blockfor; + this.consistencyLevel = consistencyLevel; + this.resolver = resolver; + this.queryStartNanoTime = queryStartNanoTime; + this.endpoints = endpoints; + this.readRepair = readRepair; + this.failureReasonByEndpoint = new ConcurrentHashMap<>(); + // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) + assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size(); + + if (logger.isTraceEnabled()) + logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ",")); + } + + public boolean await(long timePastStart, TimeUnit unit) + { + long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime); + try + { + return condition.await(time, TimeUnit.NANOSECONDS); + } + catch (InterruptedException ex) + { + throw new AssertionError(ex); + } + } + + public void awaitResults() throws ReadFailureException, ReadTimeoutException + { + boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS); + boolean failed = blockfor + failures > endpoints.size(); + if (signaled && !failed) + return; + + if (Tracing.isTracing()) + { + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); + } + else if (logger.isDebugEnabled()) + { + String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; + logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData }); + } + + // Same as for writes, see AbstractWriteResponseHandler + throw failed + ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint) + : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent()); + } + + public int blockFor() + { + return blockfor; + } + + public void response(MessageIn<ReadResponse> message) + { + resolver.preprocess(message); + int n = waitingFor(message.from) + ? recievedUpdater.incrementAndGet(this) + : received; ++ + if (n >= blockfor && resolver.isDataPresent()) - { + condition.signalAll(); - // kick off a background digest comparison if this is a result that (may have) arrived after - // the original resolve that get() kicks off as soon as the condition is signaled - if (blockfor < endpoints.size() && n == endpoints.size()) - { - readRepair.maybeStartBackgroundRepair(resolver); - } - } + } + + /** + * @return true if the message counts towards the blockfor threshold + */ + private boolean waitingFor(InetAddressAndPort from) + { + return consistencyLevel.isDatacenterLocal() + ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)) + : true; + } + + /** + * @return the current number of received responses + */ + public int getReceivedCount() + { + return received; + } + + public void response(ReadResponse result) + { + MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(), + result, + Collections.emptyMap(), + MessagingService.Verb.INTERNAL_RESPONSE, + MessagingService.current_version); + response(message); + } + + public void assureSufficientLiveNodes() throws UnavailableException + { + consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints); + } + + public boolean isLatencyForSnitch() + { + return true; + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) + { + int n = waitingFor(from) + ? failuresUpdater.incrementAndGet(this) + : failures; + + failureReasonByEndpoint.put(from, failureReason); + + if (blockfor + n > endpoints.size()) + condition.signalAll(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/ResponseResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/ResponseResolver.java index 69ec063,0000000..f4f00a2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@@ -1,66 -1,0 +1,60 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.service.reads.repair.ReadRepair; - import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.utils.concurrent.Accumulator; + +public abstract class ResponseResolver +{ + protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class); + + protected final Keyspace keyspace; + protected final ReadCommand command; + protected final ConsistencyLevel consistency; + protected final ReadRepair readRepair; + + // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints + protected final Accumulator<MessageIn<ReadResponse>> responses; + + public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount) + { + this.keyspace = keyspace; + this.command = command; + this.consistency = consistency; + this.readRepair = readRepair; + this.responses = new Accumulator<>(maxResponseCount); + } + - /** - * Consume the accumulated responses, starting a read repair if neccesary - */ - public abstract void evaluateAllResponses(TraceState traceState); - + public abstract boolean isDataPresent(); + + public void preprocess(MessageIn<ReadResponse> message) + { + responses.add(message); + } + + public Accumulator<MessageIn<ReadResponse>> getMessages() + { + return responses; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 8689356,0000000..f207b7d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@@ -1,280 -1,0 +1,250 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import org.apache.cassandra.concurrent.Stage; - import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.net.AsyncOneResponse; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.service.reads.AsyncRepairCallback; +import org.apache.cassandra.service.reads.DataResolver; +import org.apache.cassandra.service.reads.DigestResolver; +import org.apache.cassandra.service.reads.ReadCallback; - import org.apache.cassandra.service.reads.ResponseResolver; - import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; + +/** + * 'Classic' read repair. Doesn't allow the client read to return until + * updates have been written to nodes needing correction. + */ +public class BlockingReadRepair implements ReadRepair, RepairListener +{ + private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class); + + private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS = + Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); + + private final ReadCommand command; + private final List<InetAddressAndPort> endpoints; + private final long queryStartNanoTime; + private final ConsistencyLevel consistency; + + private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>(); + + private volatile DigestRepair digestRepair = null; + + private static class DigestRepair + { + private final DataResolver dataResolver; + private final ReadCallback readCallback; + private final Consumer<PartitionIterator> resultConsumer; + + public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer) + { + this.dataResolver = dataResolver; + this.readCallback = readCallback; + this.resultConsumer = resultConsumer; + } + } + + public BlockingReadRepair(ReadCommand command, + List<InetAddressAndPort> endpoints, + long queryStartNanoTime, + ConsistencyLevel consistency) + { + this.command = command; + this.endpoints = endpoints; + this.queryStartNanoTime = queryStartNanoTime; + this.consistency = consistency; + } + + public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + { + return new PartitionIteratorMergeListener(endpoints, command, this); + } + + public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair + { + + final List<AsyncOneResponse<?>> responses; + final ReadCommand command; + final ConsistencyLevel consistency; + + public BlockingPartitionRepair(int expectedResponses, ReadCommand command, ConsistencyLevel consistency) + { + this.responses = new ArrayList<>(expectedResponses); + this.command = command; + this.consistency = consistency; + } + + private AsyncOneResponse sendRepairMutation(Mutation mutation, InetAddressAndPort destination) + { + DecoratedKey key = mutation.key(); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + int messagingVersion = MessagingService.instance().getVersion(destination); + + int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion); + int maxMutationSize = DatabaseDescriptor.getMaxMutationSize(); + + AsyncOneResponse callback = null; + + if (mutationSize <= maxMutationSize) + { + Tracing.trace("Sending read-repair-mutation to {}", destination); + // use a separate verb here to avoid writing hints on timeouts + MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR); + callback = MessagingService.instance().sendRR(message, destination); + ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark(); + } + else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS) + { + logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata(), + command.metadata().partitionKeyType.getString(key.getKey()), + destination); + } + else + { + logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}", + mutationSize, + maxMutationSize, + command.metadata(), + command.metadata().partitionKeyType.getString(key.getKey()), + destination); + + int blockFor = consistency.blockFor(keyspace); + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + return callback; + } + + public void reportMutation(InetAddressAndPort endpoint, Mutation mutation) + { + AsyncOneResponse<?> response = sendRepairMutation(mutation, endpoint); + + if (response != null) + responses.add(response); + } + + public void finish() + { + Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>() + { + public void onSuccess(@Nullable List<Object> result) + { + set(result); + } + + public void onFailure(Throwable t) + { + setException(t); + } + }); + } + } + + public void awaitRepairs(long timeout) + { + try + { + Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException ex) + { + // We got all responses, but timed out while repairing + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + int blockFor = consistency.blockFor(keyspace); + if (Tracing.isTracing()) + Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor); + else + logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor); + + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + } + + public PartitionRepair startPartitionRepair() + { + BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size(), command, consistency); + repairs.add(repair); + return repair; + } + - public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) ++ public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + { + ReadRepairMetrics.repairedBlocking.mark(); + + // Do a full data read to resolve the correct response (and repair node that need be) + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this); + ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command, + keyspace, allEndpoints, queryStartNanoTime, this); + + digestRepair = new DigestRepair(resolver, readCallback, resultConsumer); + + for (InetAddressAndPort endpoint : contactedEndpoints) + { + Tracing.trace("Enqueuing full data read to {}", endpoint); + MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback); + } + } + - public void awaitForegroundRepairFinish() throws ReadTimeoutException ++ public void awaitRepair() throws ReadTimeoutException + { + if (digestRepair != null) + { + digestRepair.readCallback.awaitResults(); + digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve()); + } + } - - public void maybeStartBackgroundRepair(ResponseResolver resolver) - { - TraceState traceState = Tracing.instance.get(); - if (traceState != null) - traceState.trace("Initiating read-repair"); - StageManager.getStage(Stage.READ_REPAIR).execute(() -> resolver.evaluateAllResponses(traceState)); - } - - public void backgroundDigestRepair(TraceState traceState) - { - if (traceState != null) - traceState.trace("Digest mismatch"); - if (logger.isDebugEnabled()) - logger.debug("Digest mismatch"); - - ReadRepairMetrics.repairedBackground.mark(); - - Keyspace keyspace = Keyspace.open(command.metadata().keyspace); - final DataResolver repairResolver = new DataResolver(keyspace, command, consistency, endpoints.size(), queryStartNanoTime, this); - AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size()); - - for (InetAddressAndPort endpoint : endpoints) - MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler); - } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 39f5bff,0000000..4436f3a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@@ -1,62 -1,0 +1,49 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.List; +import java.util.function.Consumer; + +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.reads.DigestResolver; - import org.apache.cassandra.service.reads.ResponseResolver; - import org.apache.cassandra.tracing.TraceState; + +public class NoopReadRepair implements ReadRepair +{ + public static final NoopReadRepair instance = new NoopReadRepair(); + + private NoopReadRepair() {} + + public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + { + return UnfilteredPartitionIterators.MergeListener.NOOP; + } + - public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) ++ public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + { + resultConsumer.accept(digestResolver.getData()); + } + - public void awaitForegroundRepairFinish() throws ReadTimeoutException ++ public void awaitRepair() throws ReadTimeoutException + { - - } - - public void maybeStartBackgroundRepair(ResponseResolver resolver) - { - - } - - public void backgroundDigestRepair(TraceState traceState) - { - + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index 21cab20,0000000..289875d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@@ -1,73 -1,0 +1,56 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ - +package org.apache.cassandra.service.reads.repair; + +import java.util.List; +import java.util.function.Consumer; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.reads.DigestResolver; - import org.apache.cassandra.service.reads.ResponseResolver; - import org.apache.cassandra.tracing.TraceState; + +public interface ReadRepair +{ - + /** + * Used by DataResolver to generate corrections as the partition iterator is consumed + */ + UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints); + + /** + * Called when the digests from the initial read don't match. Reads may block on the + * repair started by this method. + */ - public void startForegroundRepair(DigestResolver digestResolver, - List<InetAddressAndPort> allEndpoints, - List<InetAddressAndPort> contactedEndpoints, - Consumer<PartitionIterator> resultConsumer); - - /** - * Wait for any operations started by {@link ReadRepair#startForegroundRepair} to complete - * @throws ReadTimeoutException - */ - public void awaitForegroundRepairFinish() throws ReadTimeoutException; - - /** - * Called when responses from all replicas have been received. Read will not block on this. - * @param resolver - */ - public void maybeStartBackgroundRepair(ResponseResolver resolver); ++ public void startRepair(DigestResolver digestResolver, ++ List<InetAddressAndPort> allEndpoints, ++ List<InetAddressAndPort> contactedEndpoints, ++ Consumer<PartitionIterator> resultConsumer); + + /** - * If {@link ReadRepair#maybeStartBackgroundRepair} was called with a {@link DigestResolver}, this will - * be called to perform a repair if there was a digest mismatch ++ * Wait for any operations started by {@link ReadRepair#startRepair} to complete + */ - public void backgroundDigestRepair(TraceState traceState); ++ public void awaitRepair() throws ReadTimeoutException; + + static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency) + { + return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java index 487ed65,ac8b4f7..c29760e --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@@ -74,15 -67,10 +74,14 @@@ public final class TraceKeyspac + "thread text," + "PRIMARY KEY ((session_id), event_id))"); - private static CFMetaData compile(String name, String description, String schema) + private static TableMetadata parse(String table, String description, String cql) { - return CFMetaData.compile(String.format(schema, name), SchemaConstants.TRACE_KEYSPACE_NAME) - .comment(description); + return CreateTableStatement.parse(format(cql, table), SchemaConstants.TRACE_KEYSPACE_NAME) + .id(TableId.forSystemTable(SchemaConstants.TRACE_KEYSPACE_NAME, table)) - .dcLocalReadRepairChance(0.0) + .gcGraceSeconds(0) + .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1)) + .comment(description) + .build(); } public static KeyspaceMetadata metadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java index 406f27a,6fdedc2..71d632d --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java @@@ -107,8 -107,8 +107,6 @@@ public class OverflowTest extends CQLTe { createTable("CREATE TABLE %s ( k int PRIMARY KEY, c int ) WITH " + "comment = 'My comment' " -- + "AND read_repair_chance = 0.5 " -- + "AND dclocal_read_repair_chance = 0.5 " + "AND gc_grace_seconds = 4 " + "AND bloom_filter_fp_chance = 0.01 " + "AND compaction = { 'class' : 'LeveledCompactionStrategy', 'sstable_size_in_mb' : 10, 'fanout_size' : 5 } " @@@ -117,8 -117,8 +115,6 @@@ execute("ALTER TABLE %s WITH " + "comment = 'other comment' " -- + "AND read_repair_chance = 0.3 " -- + "AND dclocal_read_repair_chance = 0.3 " + "AND gc_grace_seconds = 100 " + "AND bloom_filter_fp_chance = 0.1 " + "AND compaction = { 'class': 'SizeTieredCompactionStrategy', 'min_sstable_size' : 42 } " http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/TableCQLHelperTest.java index cfc9686,0000000..1e465b3 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java +++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java @@@ -1,447 -1,0 +1,443 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.io.FileReader; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.*; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.*; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.statements.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.index.sasi.*; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; +import org.apache.cassandra.utils.*; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TableCQLHelperTest extends CQLTester +{ + @Before + public void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + } + + @Test + public void testUserTypesCQL() + { + String keyspace = "cql_test_keyspace_user_types"; + String table = "test_table_user_types"; + + UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"), + Arrays.asList(FieldIdentifier.forUnquoted("a1"), + FieldIdentifier.forUnquoted("a2"), + FieldIdentifier.forUnquoted("a3")), + Arrays.asList(IntegerType.instance, + IntegerType.instance, + IntegerType.instance), + true); + + UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"), + Arrays.asList(FieldIdentifier.forUnquoted("b1"), + FieldIdentifier.forUnquoted("b2"), + FieldIdentifier.forUnquoted("b3")), + Arrays.asList(typeA, + typeA, + typeA), + true); + + UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"), + Arrays.asList(FieldIdentifier.forUnquoted("c1"), + FieldIdentifier.forUnquoted("c2"), + FieldIdentifier.forUnquoted("c3")), + Arrays.asList(typeB, + typeB, + typeB), + true); + + TableMetadata cfm = + TableMetadata.builder(keyspace, table) + .addPartitionKeyColumn("pk1", IntegerType.instance) + .addClusteringColumn("ck1", IntegerType.instance) + .addRegularColumn("reg1", typeC) + .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false)) + .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true)) + .build(); + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC)); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (a1 varint, a2 varint, a3 varint);", + "CREATE TYPE cql_test_keyspace_user_types.b (b1 a, b2 a, b3 a);", + "CREATE TYPE cql_test_keyspace_user_types.c (c1 b, c2 b, c3 b);"), + TableCQLHelper.getUserTypesAsCQL(cfs.metadata())); + } + + @Test + public void testDroppedColumnsCQL() + { + String keyspace = "cql_test_keyspace_dropped_columns"; + String table = "test_table_dropped_columns"; + + TableMetadata.Builder builder = + TableMetadata.builder(keyspace, table) + .addPartitionKeyColumn("pk1", IntegerType.instance) + .addClusteringColumn("ck1", IntegerType.instance) + .addRegularColumn("reg1", IntegerType.instance) + .addRegularColumn("reg2", IntegerType.instance) + .addRegularColumn("reg3", IntegerType.instance); + + ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1")); + ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2")); + ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3")); + + builder.removeRegularOrStaticColumn(reg1.name) + .removeRegularOrStaticColumn(reg2.name) + .removeRegularOrStaticColumn(reg3.name); + + builder.recordColumnDrop(reg1, 10000) + .recordColumnDrop(reg2, 20000) + .recordColumnDrop(reg3, 30000); + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;", + "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;", + "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"), + TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata())); + + assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith( + "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" + + "\tpk1 varint,\n" + + "\tck1 varint,\n" + + "\treg1 varint,\n" + + "\treg3 varint,\n" + + "\treg2 varint,\n" + + "\tPRIMARY KEY (pk1, ck1))")); + } + + @Test + public void testReaddedColumns() + { + String keyspace = "cql_test_keyspace_readded_columns"; + String table = "test_table_readded_columns"; + + TableMetadata.Builder builder = + TableMetadata.builder(keyspace, table) + .addPartitionKeyColumn("pk1", IntegerType.instance) + .addClusteringColumn("ck1", IntegerType.instance) + .addRegularColumn("reg1", IntegerType.instance) + .addStaticColumn("reg2", IntegerType.instance) + .addRegularColumn("reg3", IntegerType.instance); + + ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1")); + ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2")); + + builder.removeRegularOrStaticColumn(reg1.name); + builder.removeRegularOrStaticColumn(reg2.name); + + builder.recordColumnDrop(reg1, 10000); + builder.recordColumnDrop(reg2, 20000); + + builder.addColumn(reg1); + builder.addColumn(reg2); + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + // when re-adding, column is present in CREATE, then in DROP and then in ADD again, to record DROP with a proper timestamp + assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith( + "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" + + "\tpk1 varint,\n" + + "\tck1 varint,\n" + + "\treg2 varint static,\n" + + "\treg1 varint,\n" + + "\treg3 varint,\n" + + "\tPRIMARY KEY (pk1, ck1))")); + + assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING TIMESTAMP 10000;", + "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;", + "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg2 USING TIMESTAMP 20000;", + "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg2 varint static;"), + TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata())); + } + + @Test + public void testCfmColumnsCQL() + { + String keyspace = "cql_test_keyspace_create_table"; + String table = "test_table_create_table"; + + TableMetadata.Builder metadata = + TableMetadata.builder(keyspace, table) + .addPartitionKeyColumn("pk1", IntegerType.instance) + .addPartitionKeyColumn("pk2", AsciiType.instance) + .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance)) + .addClusteringColumn("ck2", IntegerType.instance) + .addStaticColumn("st1", AsciiType.instance) + .addRegularColumn("reg1", AsciiType.instance) + .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false)) + .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true)); + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith( + "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" + + "\tpk1 varint,\n" + + "\tpk2 ascii,\n" + + "\tck1 varint,\n" + + "\tck2 varint,\n" + + "\tst1 ascii static,\n" + + "\treg1 ascii,\n" + + "\treg2 frozen<list<varint>>,\n" + + "\treg3 map<ascii, varint>,\n" + + "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" + + "\tWITH ID = " + cfs.metadata.id + "\n" + + "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)")); + } + + @Test + public void testCfmOptionsCQL() + { + String keyspace = "cql_test_keyspace_options"; + String table = "test_table_options"; + + TableMetadata.Builder builder = TableMetadata.builder(keyspace, table); + builder.addPartitionKeyColumn("pk1", IntegerType.instance) + .addClusteringColumn("cl1", IntegerType.instance) + .addRegularColumn("reg1", AsciiType.instance) + .bloomFilterFpChance(1.0) + .comment("comment") + .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1"))) + .compression(CompressionParams.lz4(1 << 16, 1 << 15)) - .dcLocalReadRepairChance(0.2) + .crcCheckChance(0.3) + .defaultTimeToLive(4) + .gcGraceSeconds(5) + .minIndexInterval(6) + .maxIndexInterval(7) + .memtableFlushPeriod(8) - .readRepairChance(0.9) + .speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE) + .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes()))) + .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance), + FBUtilities.timestampMicros()); + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).endsWith( + "AND bloom_filter_fp_chance = 1.0\n" + - "\tAND dclocal_read_repair_chance = 0.2\n" + + "\tAND crc_check_chance = 0.3\n" + + "\tAND default_time_to_live = 4\n" + + "\tAND gc_grace_seconds = 5\n" + + "\tAND min_index_interval = 6\n" + + "\tAND max_index_interval = 7\n" + + "\tAND memtable_flush_period_in_ms = 8\n" + - "\tAND read_repair_chance = 0.9\n" + + "\tAND speculative_retry = 'ALWAYS'\n" + + "\tAND comment = 'comment'\n" + + "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" + + "\tAND compaction = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'sstable_size_in_mb': '1' }\n" + + "\tAND compression = { 'chunk_length_in_kb': '64', 'min_compress_ratio': '2.0', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }\n" + + "\tAND cdc = false\n" + + "\tAND extensions = { 'ext1': 0x76616c31 };" + )); + } + + @Test + public void testCfmIndexJson() + { + String keyspace = "cql_test_keyspace_3"; + String table = "test_table_3"; + + TableMetadata.Builder builder = + TableMetadata.builder(keyspace, table) + .addPartitionKeyColumn("pk1", IntegerType.instance) + .addClusteringColumn("cl1", IntegerType.instance) + .addRegularColumn("reg1", AsciiType.instance); + + ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true); + + builder.indexes( + Indexes.of(IndexMetadata.fromIndexTargets( + Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)), + "indexName", + IndexMetadata.Kind.COMPOSITES, + Collections.emptyMap()), + IndexMetadata.fromIndexTargets( + Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)), + "indexName2", + IndexMetadata.Kind.COMPOSITES, + Collections.emptyMap()), + IndexMetadata.fromIndexTargets( + Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)), + "indexName3", + IndexMetadata.Kind.COMPOSITES, + Collections.emptyMap()), + IndexMetadata.fromIndexTargets( + Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)), + "indexName4", + IndexMetadata.Kind.CUSTOM, + Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName())))); + + + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder); + + ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table); + + assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));", + "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));", + "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));", + "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"), + TableCQLHelper.getIndexesAsCQL(cfs.metadata())); + } + + private final static String SNAPSHOT = "testsnapshot"; + + @Test + public void testSnapshot() throws Throwable + { + String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);"); + String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);"); + String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);"); + + String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" + + "pk1 varint," + + "pk2 ascii," + + "ck1 varint," + + "ck2 varint," + + "reg1 " + typeC + "," + + "reg2 int," + + "reg3 int," + + "PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " + + "CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);"); + + alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;"); + alterTable("ALTER TABLE %s ADD reg3 int;"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES (?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5); + + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName); + cfs.snapshot(SNAPSHOT); + + String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset()); + assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA))); + assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA))); + assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (b1 frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, typeA))); + assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (c1 frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, typeB))); + + schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order + + assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" + + "\tpk1 varint,\n" + + "\tpk2 ascii,\n" + + "\tck1 varint,\n" + + "\tck2 varint,\n" + + "\treg2 int,\n" + + "\treg3 int,\n" + + "\treg1 " + typeC + ",\n" + + "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" + + "\tWITH ID = " + cfs.metadata.id + "\n" + + "\tAND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)")); + + schema = schema.substring(schema.indexOf("ALTER")); + assertTrue(schema.startsWith(String.format("ALTER TABLE %s.%s DROP reg3 USING TIMESTAMP 10000;", keyspace(), tableName))); + assertTrue(schema.contains(String.format("ALTER TABLE %s.%s ADD reg3 int;", keyspace(), tableName))); + + JSONObject manifest = (JSONObject) new JSONParser().parse(new FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT))); + JSONArray files = (JSONArray) manifest.get("files"); + Assert.assertEquals(1, files.size()); + } + + @Test + public void testSystemKsSnapshot() throws Throwable + { + ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers"); + cfs.snapshot(SNAPSHOT); + + Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists()); + Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists()); + } + + @Test + public void testDroppedType() throws Throwable + { + String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);"); + String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);"); + + String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" + + "pk1 varint," + + "ck1 varint," + + "reg1 " + typeB + "," + + "reg2 varint," + + "PRIMARY KEY (pk1, ck1));"); + + alterTable("ALTER TABLE %s DROP reg1 USING TIMESTAMP 10000;"); + + Runnable validate = () -> { + try + { + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName); + cfs.snapshot(SNAPSHOT); + String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset()); + + // When both column and it's type are dropped, the type in column definition gets substituted with a tuple + assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" + + "\tpk1 varint,\n" + + "\tck1 varint,\n" + + "\treg2 varint,\n" + + "\treg1 frozen<tuple<frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>>>,\n" + + "\tPRIMARY KEY (pk1, ck1))")); + assertTrue(schema.contains("ALTER TABLE " + keyspace() + "." + tableName + " DROP reg1 USING TIMESTAMP 10000;")); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }; + + // Validate before and after the type drop + validate.run(); + schemaChange("DROP TYPE " + keyspace() + "." + typeB); + schemaChange("DROP TYPE " + keyspace() + "." + typeA); + validate.run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
