Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9 Branch: refs/heads/cassandra-3.11 Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0 Parents: 809f3b3 a7cb009 Author: Aleksey Yeschenko <[email protected]> Authored: Tue Aug 29 12:30:40 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Aug 29 12:31:27 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../UnfilteredPartitionIterators.java | 7 --- .../db/transform/EmptyPartitionsDiscarder.java | 35 ++++++++++++++ .../apache/cassandra/db/transform/Filter.java | 28 +++-------- .../db/transform/FilteredPartitions.java | 18 +++++-- .../cassandra/db/transform/FilteredRows.java | 2 +- .../apache/cassandra/metrics/TableMetrics.java | 4 ++ .../apache/cassandra/service/DataResolver.java | 51 ++++++++++++++------ .../apache/cassandra/db/ReadCommandTest.java | 23 ++++----- 9 files changed, 107 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b0dbd60,6609b05..c4aee3a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -3.0.15 +3.11.1 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744) + * "ignore" option is ignored in sstableloader (CASSANDRA-13721) + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652) + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512) + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) +Merged from 3.0: + * Fix AssertionError in short read protection (CASSANDRA-13747) * Don't skip corrupted sstables on startup (CASSANDRA-13620) * Fix the merging of cells with different user type versions (CASSANDRA-13776) * Copy session properties on cqlsh.py do_login (CASSANDRA-13640) http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index fc225e8,4e0ac1b..778c71d --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti return Transformation.apply(toReturn, new Close()); } + public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators) + { + if (iterators.size() == 1) + return iterators.get(0); + + class Extend implements MorePartitions<UnfilteredPartitionIterator> + { + int i = 1; + public UnfilteredPartitionIterator moreContents() + { + if (i >= iterators.size()) + return null; + return iterators.get(i++); + } + } + return MorePartitions.extend(iterators.get(0), new Extend()); + } + - - public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener) - { - // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators - return filter(merge(iterators, nowInSec, listener), nowInSec); - } - public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) { return FilteredPartitions.filter(iterator, nowInSec); http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java index 7a84eca,fe88a63..b0f667c --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@@ -167,40 -151,8 +167,42 @@@ public class TableMetric public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); + public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"), + new Gauge<Double>() + { + public Double getValue() + { + double repaired = 0; + double total = 0; + for (String keyspace : Schema.instance.getNonSystemKeyspaces()) + { + Keyspace k = Schema.instance.getKeyspaceInstance(keyspace); + if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName())) + continue; + if (k.getReplicationStrategy().getReplicationFactor() < 2) + continue; + + for (ColumnFamilyStore cf : k.getColumnFamilyStores()) + { + if (!SecondaryIndexManager.isIndexColumnFamily(cf.name)) + { + for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.isRepaired()) + { + repaired += sstable.uncompressedLength(); + } + total += sstable.uncompressedLength(); + } + } + } + } + return total > 0 ? (repaired / total) * 100 : 100.0; + } + }); + + public final Meter shortReadProtectionRequests; + public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** * stores metrics that will be rolled into a single global metric http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 116dadd,72c4950..32b6d79 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; - import org.apache.cassandra.config.CFMetaData; - import org.apache.cassandra.config.ColumnDefinition; - import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; --import org.apache.cassandra.db.filter.ClusteringIndexFilter; - import org.apache.cassandra.db.filter.ColumnFilter; --import org.apache.cassandra.db.filter.DataLimits; ++import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.filter.DataLimits.Counter; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; - import org.apache.cassandra.db.transform.MoreRows; - import org.apache.cassandra.db.transform.Transformation; + import org.apache.cassandra.db.transform.*; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo if (!command.limits().isUnlimited()) { for (int i = 0; i < results.size(); i++) - results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); + results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime))); } - return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener); } private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener @@@ -526,9 -526,9 +542,9 @@@ // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a // counting iterator. - int n = postReconciliationCounter.countedInCurrentPartition(); - int x = counter.countedInCurrentPartition(); + int n = countedInCurrentPartition(postReconciliationCounter); + int x = countedInCurrentPartition(counter); - int toQuery = Math.max(((n * n) / x) - n, 1); + int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1); DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); @@@ -544,44 -547,12 +563,44 @@@ return doShortReadRetry(cmd); } + /** + * Returns the number of results counted by the counter. + * + * @param counter the counter. + * @return the number of results counted by the counter + */ + private int counted(Counter counter) + { + // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of + // groups. + if (command.limits().isGroupByLimit()) + return counter.rowCounted(); + + return counter.counted(); + } + + /** + * Returns the number of results counted in the partition by the counter. + * + * @param counter the counter. + * @return the number of results counted in the partition by the counter + */ + private int countedInCurrentPartition(Counter counter) + { + // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns + // the number of groups in the current partition. + if (command.limits().isGroupByLimit()) + return counter.rowCountedInCurrentPartition(); + + return counter.countedInCurrentPartition(); + } + private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) { - DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); + DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime); if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java index 2aef2a7,0000000..9264297 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@@ -1,311 -1,0 +1,312 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +public class ReadCommandTest +{ + private static final String KEYSPACE = "ReadCommandTest"; + private static final String CF1 = "Standard1"; + private static final String CF2 = "Standard2"; + private static final String CF3 = "Standard3"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + DatabaseDescriptor.daemonInitialization(); + + CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1); + + CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2) + .addPartitionKey("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance).build(); + + CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3) + .addPartitionKey("key", BytesType.instance) + .addClusteringColumn("col", AsciiType.instance) + .addRegularColumn("a", AsciiType.instance) + .addRegularColumn("b", AsciiType.instance) + .addRegularColumn("c", AsciiType.instance) + .addRegularColumn("d", AsciiType.instance) + .addRegularColumn("e", AsciiType.instance) + .addRegularColumn("f", AsciiType.instance).build(); + + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + metadata1, + metadata2, + metadata3); + } + + @Test + public void testPartitionRangeAbort() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1")) + .clustering("Column1") + .add("val", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2")) + .clustering("Column1") + .add("val", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + ReadCommand readCommand = Util.cmd(cfs).build(); + assertEquals(2, Util.getAll(readCommand).size()); + + readCommand.abort(); + assertEquals(0, Util.getAll(readCommand).size()); + } + + @Test + public void testSinglePartitionSliceAbort() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + + cfs.truncateBlocking(); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build(); + + List<FilteredPartition> partitions = Util.getAll(readCommand); + assertEquals(1, partitions.size()); + assertEquals(2, partitions.get(0).rowCount()); + + readCommand.abort(); + assertEquals(0, Util.getAll(readCommand).size()); + } + + @Test + public void testSinglePartitionNamesAbort() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + + cfs.truncateBlocking(); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key")) + .clustering("cc") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + cfs.forceBlockingFlush(); + + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build(); + + List<FilteredPartition> partitions = Util.getAll(readCommand); + assertEquals(1, partitions.size()); + assertEquals(2, partitions.get(0).rowCount()); + + readCommand.abort(); + assertEquals(0, Util.getAll(readCommand).size()); + } + + @Test + public void testSinglePartitionGroupMerge() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3); + + String[][][] groups = new String[][][] { + new String[][] { + new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row + new String[] { "1", "key2", "bb", "b" }, + new String[] { "1", "key3", "cc", "c" } + }, + new String[][] { + new String[] { "1", "key3", "dd", "d" }, + new String[] { "1", "key2", "ee", "e" }, + new String[] { "1", "key1", "ff", "f" } + }, + new String[][] { + new String[] { "1", "key6", "aa", "a" }, + new String[] { "1", "key5", "bb", "b" }, + new String[] { "1", "key4", "cc", "c" } + }, + new String[][] { + new String[] { "-1", "key6", "aa", "a" }, + new String[] { "-1", "key2", "bb", "b" } + } + }; + + // Given the data above, when the keys are sorted and the deletions removed, we should + // get these clustering rows in this order + String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"}; + + List<ByteBuffer> buffers = new ArrayList<>(groups.length); + int nowInSeconds = FBUtilities.nowInSeconds(); + ColumnFilter columnFilter = ColumnFilter.allColumnsBuilder(cfs.metadata).build(); + RowFilter rowFilter = RowFilter.create(); + Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP); + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false); + + for (String[][] group : groups) + { + cfs.truncateBlocking(); + + List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length); + + for (String[] data : group) + { + if (data[0].equals("1")) + { + new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1])) + .clustering(data[2]) + .add(data[3], ByteBufferUtil.bytes("blah")) + .build() + .apply(); + } + else + { + RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply(); + } + commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter)); + } + + cfs.forceBlockingFlush(); + + ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE); + + try (ReadExecutionController executionController = query.executionController(); + UnfilteredPartitionIterator iter = query.executeLocally(executionController); + DataOutputBuffer buffer = new DataOutputBuffer()) + { + UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, + columnFilter, + buffer, + MessagingService.current_version); + buffers.add(buffer.buffer()); + } + } + + // deserialize, merge and check the results are all there + List<UnfilteredPartitionIterator> iterators = new ArrayList<>(); + + for (ByteBuffer buffer : buffers) + { + try (DataInputBuffer in = new DataInputBuffer(buffer, true)) + { + iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, + MessagingService.current_version, + cfs.metadata, + columnFilter, + SerializationHelper.Flag.LOCAL)); + } + } + - try(PartitionIterator partitionIterator = UnfilteredPartitionIterators.mergeAndFilter(iterators, - nowInSeconds, - new UnfilteredPartitionIterators.MergeListener() - { - public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) ++ UnfilteredPartitionIterators.MergeListener listener = ++ new UnfilteredPartitionIterators.MergeListener() + { - return null; - } ++ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) ++ { ++ return null; ++ } + - public void close() - { ++ public void close() ++ { + - } - })) ++ } ++ }; ++ ++ try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, nowInSeconds, listener), nowInSeconds)) + { + + int i = 0; + int numPartitions = 0; + while (partitionIterator.hasNext()) + { + numPartitions++; + try(RowIterator rowIterator = partitionIterator.next()) + { + while (rowIterator.hasNext()) + { + Row row = rowIterator.next(); + assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata)); + //System.out.print(row.toString(cfs.metadata, true)); + } + } + } + + assertEquals(5, numPartitions); + assertEquals(expectedRows.length, i); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
