http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index df8820b..848ba01 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -30,7 +30,6 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; @@ -39,17 +38,15 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteFailureException; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; @@ -61,6 +58,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.OpOrder; import org.cliffc.high_scale_lib.NonBlockingHashSet; /** @@ -91,7 +89,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public static final HintedHandOffManager instance = new HintedHandOffManager(); private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class); - private static final int PAGE_SIZE = 128; + + private static final int MAX_SIMULTANEOUSLY_REPLAYED_HINTS = 128; private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody. public final HintedHandoffMetrics metrics = new HintedHandoffMetrics(); @@ -110,6 +109,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); + private static final ColumnDefinition hintColumn = SystemKeyspace.Hints.compactValueColumn(); + /** * Returns a mutation representing a Hint to be sent to <code>targetId</code> * as soon as it becomes available again. @@ -127,11 +128,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version); + + PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, + StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)), + PartitionColumns.of(hintColumn), + 1); + + Row.Writer writer = upd.writer(); + Rows.writeClustering(SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version), writer); + ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS)); - cf.addColumn(name, value, now, ttl); - return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf); + writer.writeCell(hintColumn, false, value, SimpleLivenessInfo.forUpdate(now, ttl, FBUtilities.nowInSeconds(), SystemKeyspace.Hints), null); + writer.endOfRow(); + + return new Mutation(upd); } /* @@ -142,12 +152,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public static int calculateHintTTL(Mutation mutation) { int ttl = maxHintTTL; - for (ColumnFamily cf : mutation.getColumnFamilies()) - ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds()); + for (PartitionUpdate upd : mutation.getPartitionUpdates()) + ttl = Math.min(ttl, upd.metadata().getGcGraceSeconds()); return ttl; } - public void start() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -172,11 +181,17 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp) + private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) { - Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes); - mutation.delete(SystemKeyspace.HINTS, columnName, timestamp); - mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery + DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes); + + PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, dk, PartitionColumns.of(hintColumn), 1); + + Row.Writer writer = upd.writer(); + Rows.writeClustering(clustering, writer); + Cells.writeTombstone(writer, hintColumn, timestamp, FBUtilities.nowInSeconds()); + + new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } public void deleteHintsForEndpoint(final String ipOrHostname) @@ -198,9 +213,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) return; UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint); - ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); - final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes); - mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis()); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId))); + final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds())); // execute asynchronously to avoid blocking caller (which may be processing gossip) Runnable runnable = new Runnable() @@ -266,13 +280,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } } - private static boolean pagingFinished(ColumnFamily hintColumnFamily, Composite startColumn) - { - // done if no hints found or the start column (same as last column processed in previous iteration) is the only one - return hintColumnFamily == null - || (!startColumn.isEmpty() && hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn((CellName)startColumn) != null); - } - private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException { Gossiper gossiper = Gossiper.instance; @@ -335,6 +342,27 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } doDeliverHintsToEndpoint(endpoint); + + // Flush all the tombstones to disk + hintStore.forceBlockingFlush(); + } + + private boolean checkDelivered(InetAddress endpoint, List<WriteResponseHandler<Mutation>> handlers, AtomicInteger rowsReplayed) + { + for (WriteResponseHandler<Mutation> handler : handlers) + { + try + { + handler.get(); + } + catch (WriteTimeoutException e) + { + logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}", + endpoint, rowsReplayed, e.getMessage()); + return false; + } + } + return true; } /* @@ -352,10 +380,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes); final AtomicInteger rowsReplayed = new AtomicInteger(0); - Composite startColumn = Composites.EMPTY; - - int pageSize = calculatePageSize(); - logger.debug("Using pageSize of {}", pageSize); // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272). @@ -363,55 +387,38 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - delivery: - while (true) + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = hintStore.readOrdering.start(); + RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec)) { - long now = System.currentTimeMillis(); - QueryFilter filter = QueryFilter.getSliceFilter(epkey, - SystemKeyspace.HINTS, - startColumn, - Composites.EMPTY, - false, - pageSize, - now); - - ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int) (now / 1000)); - - if (pagingFinished(hintsPage, startColumn)) - { - logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint); - break; - } + List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList(); - // check if node is still alive and we should continue delivery process - if (!FailureDetector.instance.isAlive(endpoint)) + while (iter.hasNext()) { - logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed); - break; - } + // check if node is still alive and we should continue delivery process + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed); + return; + } - List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList(); - for (final Cell hint : hintsPage) - { // check if hints delivery has been paused during the process if (hintedHandOffPaused) { logger.debug("Hints delivery process is paused, aborting"); - break delivery; + return; } - // Skip tombstones: - // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond - // in which the local deletion timestamp was generated on the last column in the old page, in which - // case the hint will have no columns (since it's deleted) but will still be included in the resultset - // since (even with gcgs=0) it's still a "relevant" tombstone. - if (!hint.isLive()) - continue; + // Wait regularly on the endpoint acknowledgment. If we timeout on it, the endpoint is probably dead so stop delivery + if (responseHandlers.size() > MAX_SIMULTANEOUSLY_REPLAYED_HINTS && !checkDelivered(endpoint, responseHandlers, rowsReplayed)) + return; - startColumn = hint.name(); + final Row hint = iter.next(); + int version = Int32Type.instance.compose(hint.clustering().get(1)); + Cell cell = hint.getCell(hintColumn); - int version = Int32Type.instance.compose(hint.name().get(1)); - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value())); + final long timestamp = cell.livenessInfo().timestamp(); + DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(cell.value())); Mutation mutation; try { @@ -420,7 +427,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean catch (UnknownColumnFamilyException e) { logger.debug("Skipping delivery of hint for deleted table", e); - deleteHint(hostIdBytes, hint.name(), hint.timestamp()); + deleteHint(hostIdBytes, hint.clustering(), timestamp); continue; } catch (IOException e) @@ -430,7 +437,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean for (UUID cfId : mutation.getColumnFamilyIds()) { - if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId)) + if (timestamp <= SystemKeyspace.getTruncatedAt(cfId)) { logger.debug("Skipping delivery of hint for truncated table {}", cfId); mutation = mutation.without(cfId); @@ -439,7 +446,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (mutation.isEmpty()) { - deleteHint(hostIdBytes, hint.name(), hint.timestamp()); + deleteHint(hostIdBytes, hint.clustering(), timestamp); continue; } @@ -450,7 +457,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public void run() { rowsReplayed.incrementAndGet(); - deleteHint(hostIdBytes, hint.name(), hint.timestamp()); + deleteHint(hostIdBytes, hint.clustering(), timestamp); } }; WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback); @@ -458,38 +465,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean responseHandlers.add(responseHandler); } - for (WriteResponseHandler<Mutation> handler : responseHandlers) - { - try - { - handler.get(); - } - catch (WriteTimeoutException|WriteFailureException e) - { - logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}", - endpoint, rowsReplayed, e.getMessage()); - break delivery; - } - } + // Wait on the last handlers + if (checkDelivered(endpoint, responseHandlers, rowsReplayed)) + logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint); } - - // Flush all the tombstones to disk - hintStore.forceBlockingFlush(); - } - - // read less columns (mutations) per page if they are very large - private int calculatePageSize() - { - int meanColumnCount = hintStore.getMeanColumns(); - if (meanColumnCount <= 0) - return PAGE_SIZE; - - int averageColumnSize = (int) (hintStore.metric.meanRowSize.getValue() / meanColumnCount); - if (averageColumnSize <= 0) - return PAGE_SIZE; - - // page size of 1 does not allow actual paging b/c of >= behavior on startColumn - return Math.max(2, Math.min(PAGE_SIZE, 4 * 1024 * 1024 / averageColumnSize)); } /** @@ -505,18 +484,26 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean // to deliver to). compact(); - IPartitioner p = StorageService.getPartitioner(); - RowPosition minPos = p.getMinimumToken().minKeyBound(); - Range<RowPosition> range = new Range<>(minPos, minPos); - IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of()); - List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis()); - for (Row row : rows) + ReadCommand cmd = new PartitionRangeReadCommand(hintStore.metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(hintStore.metadata), + RowFilter.NONE, + DataLimits.cqlLimits(Integer.MAX_VALUE, 1), + DataRange.allData(StorageService.getPartitioner())); + + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { - UUID hostId = UUIDGen.getUUID(row.key.getKey()); - InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId); - // token may have since been removed (in which case we have just read back a tombstone) - if (target != null) - scheduleHintDelivery(target, false); + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + UUID hostId = UUIDGen.getUUID(partition.partitionKey().getKey()); + InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId); + // token may have since been removed (in which case we have just read back a tombstone) + if (target != null) + scheduleHintDelivery(target, false); + } + } } logger.debug("Finished scheduleAllDeliveries"); @@ -572,42 +559,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean // Extract the keys as strings to be reported. LinkedList<String> result = new LinkedList<>(); - for (Row row : getHintsSlice(1)) + ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds()); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { - if (row.cf != null) //ignore removed rows - result.addFirst(tokenFactory.toString(row.key.getToken())); + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + // We don't delete by range on the hints table, so we don't have to worry about the + // iterator returning only range tombstone marker + if (partition.hasNext()) + result.addFirst(tokenFactory.toString(partition.partitionKey().getToken())); + } + } } return result; } - - private List<Row> getHintsSlice(int columnCount) - { - // Get count # of columns... - SliceQueryFilter predicate = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, - false, - columnCount); - - // From keys "" to ""... - IPartitioner partitioner = StorageService.getPartitioner(); - RowPosition minPos = partitioner.getMinimumToken().minKeyBound(); - Range<RowPosition> range = new Range<>(minPos, minPos); - - try - { - RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME, - SystemKeyspace.HINTS, - System.currentTimeMillis(), - predicate, - range, - null, - LARGE_NUMBER); - return StorageProxy.getRangeSlice(cmd, ConsistencyLevel.ONE); - } - catch (Exception e) - { - logger.info("HintsCF getEPPendingHints timed out."); - throw new RuntimeException(e); - } - } - }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 44df104..aad35c3 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -21,13 +21,14 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.UUID; +import org.apache.cassandra.db.partitions.PartitionUpdate; + public interface IMutation { public String getKeyspaceName(); public Collection<UUID> getColumnFamilyIds(); - public ByteBuffer key(); + public DecoratedKey key(); public long getTimeout(); public String toString(boolean shallow); - public void addAll(IMutation m); - public Collection<ColumnFamily> getColumnFamilies(); + public Collection<PartitionUpdate> getPartitionUpdates(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/IndexExpression.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java deleted file mode 100644 index bdb74ce..0000000 --- a/src/java/org/apache/cassandra/db/IndexExpression.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import com.google.common.base.Objects; - -import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; - -public final class IndexExpression -{ - public final ByteBuffer column; - public final Operator operator; - public final ByteBuffer value; - - public IndexExpression(ByteBuffer column, Operator operator, ByteBuffer value) - { - this.column = column; - this.operator = operator; - this.value = value; - } - - /** - * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator. - * - * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> - * operator, <code>false</code> otherwise. - */ - public boolean isContains() - { - return Operator.CONTAINS == operator; - } - - /** - * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator. - * - * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> - * operator, <code>false</code> otherwise. - */ - public boolean isContainsKey() - { - return Operator.CONTAINS_KEY == operator; - } - - @Override - public String toString() - { - return String.format("%s %s %s", ByteBufferUtil.bytesToHex(column), operator, ByteBufferUtil.bytesToHex(value)); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - return true; - - if (!(o instanceof IndexExpression)) - return false; - - IndexExpression ie = (IndexExpression) o; - - return Objects.equal(this.column, ie.column) - && Objects.equal(this.operator, ie.operator) - && Objects.equal(this.value, ie.value); - } - - @Override - public int hashCode() - { - return Objects.hashCode(column, operator, value); - } - - /** - * Write the serialized version of this <code>IndexExpression</code> to the specified output. - * - * @param output the output to write to - * @throws IOException if an I/O problem occurs while writing to the specified output - */ - public void writeTo(DataOutputPlus output) throws IOException - { - ByteBufferUtil.writeWithShortLength(column, output); - operator.writeTo(output); - ByteBufferUtil.writeWithShortLength(value, output); - } - - /** - * Deserializes an <code>IndexExpression</code> instance from the specified input. - * - * @param input the input to read from - * @return the <code>IndexExpression</code> instance deserialized - * @throws IOException if a problem occurs while deserializing the <code>IndexExpression</code> instance. - */ - public static IndexExpression readFrom(DataInput input) throws IOException - { - return new IndexExpression(ByteBufferUtil.readWithShortLength(input), - Operator.readFrom(input), - ByteBufferUtil.readWithShortLength(input)); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index cb5c54d..e045466 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -30,18 +30,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.pager.QueryPagers; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.metrics.KeyspaceMetrics; /** @@ -49,8 +50,6 @@ import org.apache.cassandra.metrics.KeyspaceMetrics; */ public class Keyspace { - private static final int DEFAULT_PAGE_SIZE = 10000; - private static final Logger logger = LoggerFactory.getLogger(Keyspace.class); private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", ""); @@ -145,6 +144,11 @@ public class Keyspace } } + public static ColumnFamilyStore openAndGetStore(CFMetaData cfm) + { + return open(cfm.ksName).getColumnFamilyStore(cfm.cfId); + } + /** * Removes every SSTable in the directory from the appropriate Tracker's view. * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. @@ -347,13 +351,6 @@ public class Keyspace } } - public Row getRow(QueryFilter filter) - { - ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName()); - ColumnFamily columnFamily = cfStore.getColumnFamily(filter); - return new Row(filter.key, columnFamily); - } - public void apply(Mutation mutation, boolean writeCommitLog) { apply(mutation, writeCommitLog, true); @@ -372,6 +369,7 @@ public class Keyspace if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); + int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) { // write the mutation to the commitlog and memtables @@ -382,21 +380,21 @@ public class Keyspace replayPosition = CommitLog.instance.add(mutation); } - DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key()); - for (ColumnFamily cf : mutation.getColumnFamilies()) + DecoratedKey key = mutation.key(); + for (PartitionUpdate upd : mutation.getPartitionUpdates()) { - ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); + ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId); if (cfs == null) { - logger.error("Attempting to mutate non-existant table {}", cf.id()); + logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId); continue; } - Tracing.trace("Adding to {} memtable", cf.metadata().cfName); + Tracing.trace("Adding to {} memtable", upd.metadata().cfName); SecondaryIndexManager.Updater updater = updateIndexes - ? cfs.indexManager.updaterFor(key, cf, opGroup) + ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec) : SecondaryIndexManager.nullUpdater; - cfs.apply(key, cf, updater, opGroup, replayPosition); + cfs.apply(upd, updater, opGroup, replayPosition); } } } @@ -408,30 +406,21 @@ public class Keyspace /** * @param key row to index - * @param cfs ColumnFamily to index row in + * @param cfs ColumnFamily to index partition in * @param idxNames columns to index, in comparator order */ - public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames) + public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames) { if (logger.isDebugEnabled()) - logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); + logger.debug("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey())); - try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) - { - Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames); + Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, FBUtilities.nowInSeconds(), key); - Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE); - while (pager.hasNext()) - { - ColumnFamily cf = pager.next(); - ColumnFamily cf2 = cf.cloneMeShallow(); - for (Cell cell : cf) - { - if (cfs.indexManager.indexes(cell.name(), indexes)) - cf2.addColumn(cell); - } - cfs.indexManager.indexRow(key.getKey(), cf2, opGroup); - } + try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start(); + UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, opGroup)) + { + cfs.indexManager.indexPartition(partition, opGroup, indexes, cmd.nowInSec()); } }
