Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8bbe2f59 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8bbe2f59 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8bbe2f59 Branch: refs/heads/cassandra-3.X Commit: 8bbe2f5951be9ddfd74e455ba20a09b3dc309cfe Parents: 88c5956 36ce4e0 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Tue Dec 13 10:21:58 2016 +0000 Committer: Sam Tunnicliffe <s...@beobal.com> Committed: Tue Dec 13 10:29:08 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/jvm.options | 5 + src/java/org/apache/cassandra/db/Keyspace.java | 23 ---- .../cassandra/index/SecondaryIndexManager.java | 109 ++++++++++++---- .../internal/CollatedViewIndexBuilder.java | 3 +- .../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++- 6 files changed, 222 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7413086,a65a147..2156ab9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -165,11 -55,6 +165,12 @@@ Merged from 3.0 * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776) * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545) Merged from 2.2: ++ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796) + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980) + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935) + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914) + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899) + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281) * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792) * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901) * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/conf/jvm.options ---------------------------------------------------------------------- diff --cc conf/jvm.options index 0e329d6,a7b3bd8..f91466a --- a/conf/jvm.options +++ b/conf/jvm.options @@@ -8,138 -8,6 +8,143 @@@ # - dynamic flags will be appended to these on cassandra-env # ########################################################################### +###################### +# STARTUP PARAMETERS # +###################### + +# Uncomment any of the following properties to enable specific startup parameters + +# In a multi-instance deployment, multiple Cassandra instances will independently assume that all +# CPU processors are available to it. This setting allows you to specify a smaller set of processors +# and perhaps have affinity. +#-Dcassandra.available_processors=number_of_processors + +# The directory location of the cassandra.yaml file. +#-Dcassandra.config=directory + +# Sets the initial partitioner token for a node the first time the node is started. +#-Dcassandra.initial_token=token + +# Set to false to start Cassandra on a node but not have the node join the cluster. +#-Dcassandra.join_ring=true|false + +# Set to false to clear all gossip state for the node on restart. Use when you have changed node +# information in cassandra.yaml (such as listen_address). +#-Dcassandra.load_ring_state=true|false + +# Enable pluggable metrics reporter. See Pluggable metrics reporting in Cassandra 2.0.2. +#-Dcassandra.metricsReporterConfigFile=file + +# Set the port on which the CQL native transport listens for clients. (Default: 9042) +#-Dcassandra.native_transport_port=port + +# Overrides the partitioner. (Default: org.apache.cassandra.dht.Murmur3Partitioner) +#-Dcassandra.partitioner=partitioner + +# To replace a node that has died, restart a new node in its place specifying the address of the +# dead node. The new node must not have any data in its data directory, that is, it must be in the +# same state as before bootstrapping. +#-Dcassandra.replace_address=listen_address or broadcast_address of dead node + +# Allow restoring specific tables from an archived commit log. +#-Dcassandra.replayList=table + +# Allows overriding of the default RING_DELAY (1000ms), which is the amount of time a node waits +# before joining the ring. +#-Dcassandra.ring_delay_ms=ms + +# Set the port for the Thrift RPC service, which is used for client connections. (Default: 9160) +#-Dcassandra.rpc_port=port + +# Set the SSL port for encrypted communication. (Default: 7001) +#-Dcassandra.ssl_storage_port=port + +# Enable or disable the native transport server. See start_native_transport in cassandra.yaml. +# cassandra.start_native_transport=true|false + +# Enable or disable the Thrift RPC server. (Default: true) +#-Dcassandra.start_rpc=true/false + +# Set the port for inter-node communication. (Default: 7000) +#-Dcassandra.storage_port=port + +# Set the default location for the trigger JARs. (Default: conf/triggers) +#-Dcassandra.triggers_dir=directory + +# For testing new compaction and compression strategies. It allows you to experiment with different +# strategies and benchmark write performance differences without affecting the production workload. +#-Dcassandra.write_survey=true + +# To disable configuration via JMX of auth caches (such as those for credentials, permissions and +# roles). This will mean those config options can only be set (persistently) in cassandra.yaml +# and will require a restart for new values to take effect. +#-Dcassandra.disable_auth_caches_remote_configuration=true + ++# To disable dynamic calculation of the page size used when indexing an entire partition (during ++# initial index build/rebuild). If set to true, the page size will be fixed to the default of ++# 10000 rows per page. ++#-Dcassandra.force_default_indexing_page_size=true ++ +######################## +# GENERAL JVM SETTINGS # +######################## + +# enable assertions. highly suggested for correct application functionality. +-ea + +# enable thread priorities, primarily so we can give periodic tasks +# a lower priority to avoid interfering with client workload +-XX:+UseThreadPriorities + +# allows lowering thread priority without being root on linux - probably +# not necessary on Windows but doesn't harm anything. +# see http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workar +-XX:ThreadPriorityPolicy=42 + +# Enable heap-dump if there's an OOM +-XX:+HeapDumpOnOutOfMemoryError + +# Per-thread stack size. +-Xss256k + +# Larger interned string table, for gossip's benefit (CASSANDRA-6410) +-XX:StringTableSize=1000003 + +# Make sure all memory is faulted and zeroed on startup. +# This helps prevent soft faults in containers and makes +# transparent hugepage allocation more effective. +-XX:+AlwaysPreTouch + +# Disable biased locking as it does not benefit Cassandra. +-XX:-UseBiasedLocking + +# Enable thread-local allocation blocks and allow the JVM to automatically +# resize them at runtime. +-XX:+UseTLAB +-XX:+ResizeTLAB +-XX:+UseNUMA + +# http://www.evanjones.ca/jvm-mmap-pause.html +-XX:+PerfDisableSharedMem + +# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See +# http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: +# comment out this entry to enable IPv6 support). +-Djava.net.preferIPv4Stack=true + +### Debug options + +# uncomment to enable flight recorder +#-XX:+UnlockCommercialFeatures +#-XX:+FlightRecorder + +# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 +#-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1414 + +# uncomment to have Cassandra JVM log internal method compilation (developers only) +#-XX:+UnlockDiagnosticVMOptions +#-XX:+LogCompilation + ################# # HEAP SETTINGS # ################# http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 6e36511,003b624..ccb91c1 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@@ -55,7 -56,10 +56,10 @@@ import org.apache.cassandra.index.trans import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; + import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; ++import org.apache.cassandra.transport.ProtocolVersion; + import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; @@@ -536,35 -523,55 +543,55 @@@ public class SecondaryIndexManager impl /** * When building an index against existing data in sstables, add the given partition to the index */ - public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec) + public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize) { + if (logger.isTraceEnabled()) + logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey())); + if (!indexes.isEmpty()) { - DecoratedKey key = partition.partitionKey(); - Set<Index.Indexer> indexers = indexes.stream() - .map(index -> index.indexerFor(key, - partition.columns(), - nowInSec, - opGroup, - IndexTransaction.Type.UPDATE)) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - - indexers.forEach(Index.Indexer::begin); - - try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec)) + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, + FBUtilities.nowInSeconds(), + key); + int nowInSec = cmd.nowInSec(); + boolean readStatic = false; + - SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION); ++ SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT); + while (!pager.isExhausted()) { - if (!filtered.staticRow().isEmpty()) - indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow())); - - while (filtered.hasNext()) - try (ReadOrderGroup readGroup = cmd.startOrderGroup(); ++ try (ReadExecutionController controller = cmd.executionController(); + OpOrder.Group writeGroup = Keyspace.writeOrder.start(); + RowIterator partition = - PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup), ++ PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize, controller), + cmd)) { - Row row = filtered.next(); - indexers.forEach(indexer -> indexer.insertRow(row)); + Set<Index.Indexer> indexers = indexes.stream() + .map(index -> index.indexerFor(key, + partition.columns(), + nowInSec, + writeGroup, + IndexTransaction.Type.UPDATE)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + indexers.forEach(Index.Indexer::begin); + + // only process the static row once per partition + if (!readStatic && !partition.staticRow().isEmpty()) + { + indexers.forEach(indexer -> indexer.insertRow(partition.staticRow())); + readStatic = true; + } + + while (partition.hasNext()) + { + Row row = partition.next(); + indexers.forEach(indexer -> indexer.insertRow(row)); + } + + indexers.forEach(Index.Indexer::finish); } } - - indexers.forEach(Index.Indexer::finish); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java index 8ea7a68,0000000..811d857 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java @@@ -1,78 -1,0 +1,79 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.internal; + +import java.util.Set; +import java.util.UUID; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionInfo; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.index.SecondaryIndexBuilder; +import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.utils.UUIDGen; + +/** + * Manages building an entire index from column family data. Runs on to compaction manager. + */ +public class CollatedViewIndexBuilder extends SecondaryIndexBuilder +{ + private final ColumnFamilyStore cfs; + private final Set<Index> indexers; + private final ReducingKeyIterator iter; + private final UUID compactionId; + + public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter) + { + this.cfs = cfs; + this.indexers = indexers; + this.iter = iter; + this.compactionId = UUIDGen.getTimeUUID(); + } + + public CompactionInfo getCompactionInfo() + { + return new CompactionInfo(cfs.metadata, + OperationType.INDEX_BUILD, + iter.getBytesRead(), + iter.getTotalBytes(), + compactionId); + } + + public void build() + { + try + { ++ int pageSize = cfs.indexManager.calculateIndexingPageSize(); + while (iter.hasNext()) + { + if (isStopRequested()) + throw new CompactionInterruptedException(getCompactionInfo()); + DecoratedKey key = iter.next(); - Keyspace.indexPartition(key, cfs, indexers); ++ cfs.indexManager.indexPartition(key, indexers, pageSize); + } + } + finally + { + iter.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8bbe2f59/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java index 4a43210,33e7182..a462e2f --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@@ -46,11 -44,15 +44,15 @@@ import org.apache.cassandra.db.marshal. import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; + import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.InvalidRequestException; + import org.apache.cassandra.index.transactions.IndexTransaction; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; + import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.Util.throwAssert; import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;