merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f620b348 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f620b348 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f620b348 Branch: refs/heads/trunk Commit: f620b348a064a55749ffcbc4d25e08c3fe71f1be Parents: 0680372 b706391 Author: Jonathan Ellis <[email protected]> Authored: Thu May 23 10:08:06 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu May 23 10:08:06 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 9 ++++- .../apache/cassandra/db/HintedHandOffManager.java | 25 +++++++++++ src/java/org/apache/cassandra/db/RowMutation.java | 9 ++++ src/java/org/apache/cassandra/db/SystemTable.java | 32 ++++++++++----- .../cassandra/db/commitlog/CommitLogReplayer.java | 10 ++--- 6 files changed, 68 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b765896,66c5f04..d283b72 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,58 -1,5 +1,59 @@@ +2.0 + * Removed on-heap row cache (CASSANDRA-5348) + * use nanotime consistently for node-local timeouts (CASSANDRA-5581) + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577) + * Experimental triggers (CASSANDRA-1311) + * JEMalloc support for off-heap allocation (CASSANDRA-3997) + * Single-pass compaction (CASSANDRA-4180) + * Removed token range bisection (CASSANDRA-5518) + * Removed compatibility with pre-1.2.5 sstables and network messages + (CASSANDRA-5511) + * removed PBSPredictor (CASSANDRA-5455) + * CAS support (CASSANDRA-5062, 5441, 5443) + * Leveled compaction performs size-tiered compactions in L0 + (CASSANDRA-5371, 5439) + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) + * Log when a node is down longer than the hint window (CASSANDRA-4554) + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917) + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407) + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) + * Change Message IDs to ints (CASSANDRA-5307) + * Move sstable level information into the Stats component, removing the + need for a separate Manifest file (CASSANDRA-4872) + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) + * make index_interval configurable per columnfamily (CASSANDRA-3961) + * add default_time_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + * drop unnecessary keyspace parameter from user-defined compaction API + (CASSANDRA-5139) + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) + * Change order of directory searching for c*.in.sh (CASSANDRA-3983) + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) + * Remove memory emergency pressure valve logic (CASSANDRA-3534) + * Reduce request latency with eager retry (CASSANDRA-4705) + * cqlsh: Remove ASSUME command (CASSANDRA-5331) + * Rebuild BF when loading sstables if bloom_filter_fp_chance + has changed since compaction (CASSANDRA-5015) + * remove row-level bloom filters (CASSANDRA-4885) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) + * Add binary protocol versioning (CASSANDRA-5436) + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530) + * Add alias support to SELECT statement (CASSANDRA-5075) + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541) + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579) + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585) + + 1.2.6 + * Ignore pre-truncate hints (CASSANDRA-4655) * Move System.exit on OOM into a separate thread (CASSANDRA-5273) * Write row markers when serializing schema (CASSANDRA-5572) * Check only SSTables for the requested range when streaming (CASSANDRA-5569) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 78825cf,429859e..dcd7814 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1830,116 -1793,11 +1831,116 @@@ public class ColumnFamilyStore implemen } } - long truncatedAt = System.currentTimeMillis(); - if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Table.getTimestampedSnapshotName(columnFamily)); + Runnable truncateRunnable = new Runnable() + { + public void run() + { + logger.debug("Discarding sstable data for truncated CF + indexes"); + + final long truncatedAt = System.currentTimeMillis(); + if (DatabaseDescriptor.isAutoSnapshot()) + snapshot(Table.getTimestampedSnapshotName(name)); + + ReplayPosition replayAfter = discardSSTables(truncatedAt); + + for (SecondaryIndex index : indexManager.getIndexes()) + index.truncateBlocking(truncatedAt); + - SystemTable.saveTruncationPosition(ColumnFamilyStore.this, replayAfter); ++ SystemTable.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + + logger.debug("cleaning out row cache"); + for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + { + if (key.cfId == metadata.cfId) + CacheService.instance.rowCache.remove(key); + } + } + }; + + runWithCompactionsDisabled(Executors.callable(truncateRunnable), true); + logger.debug("truncate complete"); + } + + public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation) + { + // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, + // and so we only run one major compaction at a time + synchronized (this) + { + logger.debug("Cancelling in-progress compactions for {}", metadata.cfName); + + Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes(); + for (ColumnFamilyStore cfs : selfWithIndexes) + cfs.getCompactionStrategy().pause(); + try + { + // interrupt in-progress compactions + Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>() + { + public CFMetaData apply(ColumnFamilyStore cfs) + { + return cfs.metadata; + } + }; + Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f); + CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation); + + // wait for the interruption to be recognized + long start = System.nanoTime(); + long delay = TimeUnit.MINUTES.toNanos(1); + while (System.nanoTime() - start < delay) + { + if (CompactionManager.instance.isCompacting(selfWithIndexes)) + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + else + break; + } + + // doublecheck that we finished, instead of timing out + for (ColumnFamilyStore cfs : selfWithIndexes) + { + if (!cfs.getDataTracker().getCompacting().isEmpty()) + { + logger.warn("Unable to cancel in-progress compactions for {}. Probably there is an unusually large row in progress somewhere. It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName); + } + } + logger.debug("Compactions successfully cancelled"); + + // run our task + try + { + return callable.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + finally + { + for (ColumnFamilyStore cfs : selfWithIndexes) + cfs.getCompactionStrategy().resume(); + } + } + } + + public Iterable<SSTableReader> markAllCompacting() + { + Callable<Iterable<SSTableReader>> callable = new Callable<Iterable<SSTableReader>>() + { + public Iterable<SSTableReader> call() throws Exception + { + assert data.getCompacting().isEmpty() : data.getCompacting(); + Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); + if (Iterables.isEmpty(sstables)) + return null; + boolean success = data.markCompacting(sstables); + assert success : "something marked things compacting while compactions are disabled"; + return sstables; + } + }; - return CompactionManager.instance.submitTruncate(this, truncatedAt); + return runWithCompactionsDisabled(callable, false); } public long getBloomFilterFalsePositives() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/RowMutation.java index b08055f,b85cfcd..d78247b --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@@ -238,14 -311,65 +238,23 @@@ public class RowMutation implements IMu return buff.append("])").toString(); } - public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc) - { - if (cosc.super_column != null) - { - for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns) - { - add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value, column.timestamp, column.ttl); - } - } - else if (cosc.column != null) - { - add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp, cosc.column.ttl); - } - else if (cosc.counter_super_column != null) - { - for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns) - { - addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name), column.value); - } - } - else // cosc.counter_column != null - { - addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value); - } - } - - public void deleteColumnOrSuperColumn(String cfName, Deletion del) - { - if (del.predicate != null && del.predicate.column_names != null) - { - for(ByteBuffer c : del.predicate.column_names) - { - if (del.super_column == null && Schema.instance.getColumnFamilyType(table, cfName) == ColumnFamilyType.Super) - delete(new QueryPath(cfName, c), del.timestamp); - else - delete(new QueryPath(cfName, del.super_column, c), del.timestamp); - } - } - else - { - delete(new QueryPath(cfName, del.super_column), del.timestamp); - } - } - + public RowMutation without(UUID cfId) + { + RowMutation rm = new RowMutation(table, key); + for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet()) + if (!entry.getKey().equals(cfId)) + rm.add(entry.getValue()); + return rm; + } + public static class RowMutationSerializer implements IVersionedSerializer<RowMutation> { - public void serialize(RowMutation rm, DataOutput dos, int version) throws IOException + public void serialize(RowMutation rm, DataOutput out, int version) throws IOException { - dos.writeUTF(rm.getTable()); - ByteBufferUtil.writeWithShortLength(rm.key(), dos); + if (version < MessagingService.VERSION_20) + out.writeUTF(rm.getTable()); + + ByteBufferUtil.writeWithShortLength(rm.key(), out); /* serialize the modifications in the mutation */ int size = rm.modifications.size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemTable.java index dd818b2,327f01b..9662661 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@@ -45,14 -43,14 +45,14 @@@ import org.apache.cassandra.db.marshal. import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; - import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.sstable.SSTableReader; + import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.Constants; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CounterId; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.service.paxos.PaxosState; +import org.apache.cassandra.thrift.cassandraConstants; +import org.apache.cassandra.utils.*; import static org.apache.cassandra.cql3.QueryProcessor.processInternal; @@@ -126,71 -140,61 +126,81 @@@ public class SystemTabl DatabaseDescriptor.getPartitioner().getClass().getName())); } - /** if system data becomes incompatible across versions of cassandra, that logic (and associated purging) is managed here */ - private static void upgradeSystemData() throws ExecutionException, InterruptedException + /** + * Write compaction log, except columfamilies under system keyspace. + * + * @param cfs + * @param toCompact sstables to compact + * @return compaction task id or null if cfs is under system keyspace + */ + public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact) { - Table table = Table.open(Table.SYSTEM_KS); - ColumnFamilyStore oldStatusCfs = table.getColumnFamilyStore(OLD_STATUS_CF); - if (oldStatusCfs.getSSTables().size() > 0) + if (Table.SYSTEM_KS.equals(cfs.table.getName())) + return null; + + UUID compactionId = UUIDGen.getTimeUUID(); + String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (%s, '%s', '%s', {%s})"; + Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>() { - SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance); - cols.add(ByteBufferUtil.bytes("ClusterName")); - cols.add(ByteBufferUtil.bytes("Token")); - QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols); - ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter); - Iterator<IColumn> oldColumns = oldCf.columns.iterator(); - - String clusterName = null; - try - { - clusterName = ByteBufferUtil.string(oldColumns.next().value()); - } - catch (CharacterCodingException e) + public Integer apply(SSTableReader sstable) { - throw new RuntimeException(e); + return sstable.descriptor.generation; } - // serialize the old token as a collection of (one )tokens. - Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value()); - String tokenBytes = tokensAsSet(Collections.singleton(token)); - // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason) - String req = "INSERT INTO system.%s (key, cluster_name, tokens, bootstrapped) VALUES ('%s', '%s', %s, '%s')"; - processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name())); - - oldStatusCfs.truncate(); - } + }); + processInternal(String.format(req, COMPACTION_LOG, compactionId, cfs.table.getName(), cfs.name, StringUtils.join(Sets.newHashSet(generations), ','))); + forceBlockingFlush(COMPACTION_LOG); + return compactionId; + } + + public static void finishCompaction(UUID taskId) + { + assert taskId != null; - ColumnFamilyStore oldHintsCfs = table.getColumnFamilyStore(OLD_HINTS_CF); - if (oldHintsCfs.getSSTables().size() > 0) + String req = "DELETE FROM system.%s WHERE id = %s"; + processInternal(String.format(req, COMPACTION_LOG, taskId)); + forceBlockingFlush(COMPACTION_LOG); + } + + /** + * @return unfinished compactions, grouped by keyspace/columnfamily pair. + */ + public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions() + { + String req = "SELECT * FROM system.%s"; + UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG)); + + SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create(); + for (UntypedResultSet.Row row : resultSet) { - logger.info("Possible old-format hints found. Truncating"); - oldHintsCfs.truncate(); + String keyspace = row.getString("keyspace_name"); + String columnfamily = row.getString("columnfamily_name"); + Set<Integer> inputs = row.getSet("inputs", Int32Type.instance); + + unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs); } + return unfinishedCompactions; + } + + public static void discardCompactionsInProgress() + { + ColumnFamilyStore compactionLog = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(COMPACTION_LOG); + compactionLog.truncateBlocking(); } - public static void saveTruncationPosition(ColumnFamilyStore cfs, ReplayPosition position) + public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; - processInternal(String.format(req, LOCAL_CF, positionAsMapEntry(cfs, position), LOCAL_KEY)); + processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY)); + forceBlockingFlush(LOCAL_CF); + } + + /** + * This method is used to remove information about truncation time for specified column family + */ + public static void removeTruncationRecord(UUID cfId) + { + String req = "DELETE truncation_time['%s'] from system.%s WHERE key = '%s'"; + processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY)); forceBlockingFlush(LOCAL_CF); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f620b348/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ----------------------------------------------------------------------
