http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java deleted file mode 100644 index 2ddc6ca..0000000 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.util.*; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.RowIterator; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.partitions.*; -import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; - -public abstract class AbstractReadCommandBuilder -{ - protected final ColumnFamilyStore cfs; - protected int nowInSeconds; - - private int cqlLimit = -1; - private int pagingLimit = -1; - protected boolean reversed = false; - - protected Set<ColumnIdentifier> columns; - protected final RowFilter filter = RowFilter.create(); - - private Slice.Bound lowerClusteringBound; - private Slice.Bound upperClusteringBound; - - private NavigableSet<Clustering> clusterings; - - // Use Util.cmd() instead of this ctor directly - AbstractReadCommandBuilder(ColumnFamilyStore cfs) - { - this.cfs = cfs; - this.nowInSeconds = FBUtilities.nowInSeconds(); - } - - public AbstractReadCommandBuilder withNowInSeconds(int nowInSec) - { - this.nowInSeconds = nowInSec; - return this; - } - - public AbstractReadCommandBuilder fromIncl(Object... values) - { - assert lowerClusteringBound == null && clusterings == null; - this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values); - return this; - } - - public AbstractReadCommandBuilder fromExcl(Object... values) - { - assert lowerClusteringBound == null && clusterings == null; - this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values); - return this; - } - - public AbstractReadCommandBuilder toIncl(Object... values) - { - assert upperClusteringBound == null && clusterings == null; - this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values); - return this; - } - - public AbstractReadCommandBuilder toExcl(Object... values) - { - assert upperClusteringBound == null && clusterings == null; - this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values); - return this; - } - - public AbstractReadCommandBuilder includeRow(Object... values) - { - assert lowerClusteringBound == null && upperClusteringBound == null; - - if (this.clusterings == null) - this.clusterings = new TreeSet<>(cfs.metadata.comparator); - - this.clusterings.add(cfs.metadata.comparator.make(values)); - return this; - } - - public AbstractReadCommandBuilder reverse() - { - this.reversed = true; - return this; - } - - public AbstractReadCommandBuilder withLimit(int newLimit) - { - this.cqlLimit = newLimit; - return this; - } - - public AbstractReadCommandBuilder withPagingLimit(int newLimit) - { - this.pagingLimit = newLimit; - return this; - } - - public AbstractReadCommandBuilder columns(String... columns) - { - if (this.columns == null) - this.columns = new HashSet<>(); - - for (String column : columns) - this.columns.add(ColumnIdentifier.getInterned(column, true)); - return this; - } - - private ByteBuffer bb(Object value, AbstractType<?> type) - { - return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value); - } - - private AbstractType<?> forValues(AbstractType<?> collectionType) - { - assert collectionType instanceof CollectionType; - CollectionType ct = (CollectionType)collectionType; - switch (ct.kind) - { - case LIST: - case MAP: - return ct.valueComparator(); - case SET: - return ct.nameComparator(); - } - throw new AssertionError(); - } - - private AbstractType<?> forKeys(AbstractType<?> collectionType) - { - assert collectionType instanceof CollectionType; - CollectionType ct = (CollectionType)collectionType; - switch (ct.kind) - { - case LIST: - case MAP: - return ct.nameComparator(); - } - throw new AssertionError(); - } - - public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value) - { - ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true)); - assert def != null; - - AbstractType<?> type = def.type; - if (op == Operator.CONTAINS) - type = forValues(type); - else if (op == Operator.CONTAINS_KEY) - type = forKeys(type); - - this.filter.add(def, op, bb(value, type)); - return this; - } - - protected ColumnFilter makeColumnFilter() - { - if (columns == null || columns.isEmpty()) - return ColumnFilter.all(cfs.metadata); - - ColumnFilter.Builder filter = ColumnFilter.selectionBuilder(); - for (ColumnIdentifier column : columns) - filter.add(cfs.metadata.getColumnDefinition(column)); - return filter.build(); - } - - protected ClusteringIndexFilter makeFilter() - { - if (clusterings != null) - { - return new ClusteringIndexNamesFilter(clusterings, reversed); - } - else - { - Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound, - upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound); - return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed); - } - } - - protected DataLimits makeLimits() - { - DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit); - if (pagingLimit >= 0) - limits = limits.forPaging(pagingLimit); - return limits; - } - - public abstract ReadCommand build(); - - public static class SinglePartitionBuilder extends AbstractReadCommandBuilder - { - private final DecoratedKey partitionKey; - - public SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key) - { - super(cfs); - this.partitionKey = key; - } - - @Override - public ReadCommand build() - { - return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter()); - } - } - - public static class SinglePartitionSliceBuilder extends AbstractReadCommandBuilder - { - private final DecoratedKey partitionKey; - private Slices.Builder sliceBuilder; - - public SinglePartitionSliceBuilder(ColumnFamilyStore cfs, DecoratedKey key) - { - super(cfs); - this.partitionKey = key; - sliceBuilder = new Slices.Builder(cfs.getComparator()); - } - - public SinglePartitionSliceBuilder addSlice(Slice slice) - { - sliceBuilder.add(slice); - return this; - } - - @Override - protected ClusteringIndexFilter makeFilter() - { - return new ClusteringIndexSliceFilter(sliceBuilder.build(), reversed); - } - - @Override - public ReadCommand build() - { - return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter()); - } - } - - public static class PartitionRangeBuilder extends AbstractReadCommandBuilder - { - private DecoratedKey startKey; - private boolean startInclusive; - private DecoratedKey endKey; - private boolean endInclusive; - - public PartitionRangeBuilder(ColumnFamilyStore cfs) - { - super(cfs); - } - - public PartitionRangeBuilder fromKeyIncl(Object... values) - { - assert startKey == null; - this.startInclusive = true; - this.startKey = makeKey(cfs.metadata, values); - return this; - } - - public PartitionRangeBuilder fromKeyExcl(Object... values) - { - assert startKey == null; - this.startInclusive = false; - this.startKey = makeKey(cfs.metadata, values); - return this; - } - - public PartitionRangeBuilder toKeyIncl(Object... values) - { - assert endKey == null; - this.endInclusive = true; - this.endKey = makeKey(cfs.metadata, values); - return this; - } - - public PartitionRangeBuilder toKeyExcl(Object... values) - { - assert endKey == null; - this.endInclusive = false; - this.endKey = makeKey(cfs.metadata, values); - return this; - } - - @Override - public ReadCommand build() - { - PartitionPosition start = startKey; - if (start == null) - { - start = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); - startInclusive = false; - } - PartitionPosition end = endKey; - if (end == null) - { - end = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); - endInclusive = true; - } - - AbstractBounds<PartitionPosition> bounds; - if (startInclusive && endInclusive) - bounds = new Bounds<>(start, end); - else if (startInclusive && !endInclusive) - bounds = new IncludingExcludingBounds<>(start, end); - else if (!startInclusive && endInclusive) - bounds = new Range<>(start, end); - else - bounds = new ExcludingBounds<>(start, end); - - return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); - } - - static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) - { - if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) - return (DecoratedKey)partitionKey[0]; - - ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return StorageService.getPartitioner().decorateKey(key); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 24da365..6ac132c 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -53,7 +53,6 @@ import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.db.view.MaterializedViewManager; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; @@ -160,7 +159,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private final AtomicInteger fileIndexGenerator = new AtomicInteger(0); public final SecondaryIndexManager indexManager; - public final MaterializedViewManager materializedViewManager; /* These are locally held copies to be changed from the config during runtime */ private volatile DefaultInteger minCompactionThreshold; @@ -197,7 +195,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean indexManager.reload(); - materializedViewManager.reload(); // If the CF comparator has changed, we need to change the memtable, // because the old one still aliases the previous comparator. if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator) @@ -334,7 +331,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.partitioner = partitioner; this.directories = directories; this.indexManager = new SecondaryIndexManager(this); - this.materializedViewManager = new MaterializedViewManager(this); this.metric = new TableMetrics(this); fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; @@ -455,7 +451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SystemKeyspace.removeTruncationRecord(metadata.cfId); data.dropSSTables(); indexManager.invalidate(); - materializedViewManager.invalidate(); invalidateCaches(); } @@ -577,10 +572,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // must be called after all sstables are loaded since row cache merges all row versions - public void init() + public void initRowCache() { - materializedViewManager.init(); - if (!isRowCacheEnabled()) return; @@ -1813,7 +1806,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean cfs.data.reset(); return null; } - }, true, false); + }, true); } } @@ -1841,16 +1834,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // flush the CF being truncated before forcing the new segment forceBlockingFlush(); - materializedViewManager.forceBlockingFlush(); - // sleep a little to make sure that our truncatedAt comes after any sstable // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); } else { - dumpMemtable(); - materializedViewManager.dumpMemtables(); + // just nuke the memtable data w/o writing to disk first + synchronized (data) + { + final Flush flush = new Flush(true); + flushExecutor.execute(flush); + postFlushExecutor.submit(flush.postFlush); + } } Runnable truncateRunnable = new Runnable() @@ -1870,32 +1866,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SecondaryIndex index : indexManager.getIndexes()) index.truncateBlocking(truncatedAt); - materializedViewManager.truncateBlocking(truncatedAt); - SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); logger.debug("cleaning out row cache"); invalidateCaches(); } }; - runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); + runWithCompactionsDisabled(Executors.callable(truncateRunnable), true); logger.debug("truncate complete"); } - /** - * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable. - */ - public void dumpMemtable() - { - synchronized (data) - { - final Flush flush = new Flush(true); - flushExecutor.execute(flush); - postFlushExecutor.submit(flush.postFlush); - } - } - - public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews) + public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation) { // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, // and so we only run one major compaction at a time @@ -1903,20 +1884,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { logger.debug("Cancelling in-progress compactions for {}", metadata.cfName); - Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews - ? Iterables.concat(concatWithIndexes(), materializedViewManager.allViewsCfs()) - : concatWithIndexes(); - - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes(); + for (ColumnFamilyStore cfs : selfWithIndexes) cfs.getCompactionStrategyManager().pause(); try { // interrupt in-progress compactions - CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation); - CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs); + CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation); + CompactionManager.instance.waitForCessation(selfWithIndexes); // doublecheck that we finished, instead of timing out - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : selfWithIndexes) { if (!cfs.getTracker().getCompacting().isEmpty()) { @@ -1938,7 +1916,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } finally { - for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) + for (ColumnFamilyStore cfs : selfWithIndexes) cfs.getCompactionStrategyManager().resume(); } } @@ -1958,7 +1936,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } }; - return runWithCompactionsDisabled(callable, false, false); + return runWithCompactionsDisabled(callable, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 78b593b..f37ce66 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -23,15 +23,13 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.commitlog.CommitLog; @@ -39,17 +37,14 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.db.view.MaterializedViewManager; -import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.metrics.KeyspaceMetrics; @@ -75,10 +70,7 @@ public class Keyspace } private volatile KeyspaceMetadata metadata; - - //OpOrder is defined globally since we need to order writes across - //Keyspaces in the case of MaterializedViews (batchlog of MV mutations) - public static final OpOrder writeOrder = new OpOrder(); + public final OpOrder writeOrder = new OpOrder(); /* ColumnFamilyStore per column family */ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>(); @@ -130,7 +122,7 @@ public class Keyspace // keyspace has to be constructed and in the cache before cacheRow can be called for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores()) - cfs.init(); + cfs.initRowCache(); } } } @@ -360,14 +352,10 @@ public class Keyspace // CFS being created for the first time, either on server startup or new CF being added. // We don't worry about races here; startup is safe, and adding multiple idential CFs // simultaneously is a "don't do that" scenario. - ColumnFamilyStore newCfs = ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables); - - ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, newCfs); + ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables)); // CFS mbean instantiation will error out before we hit this, but in case that changes... if (oldCfs != null) throw new IllegalStateException("added multiple mappings for cf id " + cfId); - - newCfs.init(); } else { @@ -392,41 +380,11 @@ public class Keyspace * @param writeCommitLog false to disable commitlog append entirely * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") */ - public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes) + public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); - Lock lock = null; - boolean requiresViewUpdate = updateIndexes && MaterializedViewManager.updatesAffectView(Collections.singleton(mutation), false); - - if (requiresViewUpdate) - { - lock = MaterializedViewManager.acquireLockFor(mutation.key().getKey()); - - if (lock == null) - { - if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) - { - logger.debug("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); - Tracing.trace("Could not acquire MV lock"); - throw new WriteTimeoutException(WriteType.MATERIALIZED_VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); - } - else - { - //This MV update can't happen right now. so rather than keep this thread busy - // we will re-apply ourself to the queue and try again later - StageManager.getStage(Stage.MUTATION).execute(() -> { - if (writeCommitLog) - mutation.apply(); - else - mutation.applyUnsafe(); - }); - - return; - } - } - } int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group opGroup = writeOrder.start()) { @@ -443,26 +401,10 @@ public class Keyspace ColumnFamilyStore cfs = columnFamilyStores.get(upd.metadata().cfId); if (cfs == null) { - logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId, upd.metadata().ksName, upd.metadata().cfName); + logger.error("Attempting to mutate non-existant table {}", upd.metadata().cfId); continue; } - if (requiresViewUpdate) - { - try - { - Tracing.trace("Create materialized view mutations from replica"); - cfs.materializedViewManager.pushViewReplicaUpdates(upd.partitionKey().getKey(), upd); - } - catch (Exception e) - { - if (!(e instanceof WriteTimeoutException)) - logger.warn("Encountered exception when creating materialized view mutations", e); - - JVMStabilityInspector.inspectThrowable(e); - } - } - Tracing.trace("Adding to {} memtable", upd.metadata().cfName); SecondaryIndexManager.Updater updater = updateIndexes ? cfs.indexManager.updaterFor(upd, opGroup, nowInSec) @@ -470,11 +412,6 @@ public class Keyspace cfs.apply(upd, updater, opGroup, replayPosition); } } - finally - { - if (lock != null) - lock.unlock(); - } } public AbstractReplicationStrategy getReplicationStrategy() http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index ace114b..3d49ca6 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -54,8 +54,6 @@ public class Mutation implements IMutation // map of column family id to mutations for that column family. private final Map<UUID, PartitionUpdate> modifications; - // Time at which this mutation was instantiated - public final long createdAt = System.currentTimeMillis(); public Mutation(String keyspaceName, DecoratedKey key) { this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/MutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java index 640e45f..3baa93e 100644 --- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java @@ -22,7 +22,6 @@ import java.io.IOError; import java.io.IOException; import java.net.InetAddress; -import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@ -48,17 +47,10 @@ public class MutationVerbHandler implements IVerbHandler<Mutation> replyTo = InetAddress.getByAddress(from); } - try - { message.payload.apply(); WriteResponse response = new WriteResponse(); Tracing.trace("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(response.createMessage(), id, replyTo); - } - catch (WriteTimeoutException wto) - { - Tracing.trace("Payload application resulted in WriteTimeout, not replying"); - } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 7ea946b..7bfd552 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -101,8 +101,6 @@ public final class SystemKeyspace public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; - public static final String MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS = "materializedviews_builds_in_progress"; - public static final String BUILT_MATERIALIZEDVIEWS = "built_materializedviews"; @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces"; @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies"; @@ -263,24 +261,6 @@ public final class SystemKeyspace + "ranges set<blob>," + "PRIMARY KEY ((keyspace_name)))"); - public static final CFMetaData MaterializedViewsBuildsInProgress = - compile(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS, - "materialized views builds current progress", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "last_token varchar," - + "generation_number int," - + "PRIMARY KEY ((keyspace_name), view_name))"); - - public static final CFMetaData BuiltMaterializedViews = - compile(BUILT_MATERIALIZEDVIEWS, - "built materialized views", - "CREATE TABLE \"%s\" (" - + "keyspace_name text," - + "view_name text," - + "PRIMARY KEY ((keyspace_name), view_name))"); - @Deprecated public static final CFMetaData LegacyKeyspaces = compile(LEGACY_KEYSPACES, @@ -421,8 +401,6 @@ public final class SystemKeyspace SSTableActivity, SizeEstimates, AvailableRanges, - MaterializedViewsBuildsInProgress, - BuiltMaterializedViews, LegacyKeyspaces, LegacyColumnfamilies, LegacyColumns, @@ -515,82 +493,6 @@ public final class SystemKeyspace return CompactionHistoryTabularData.from(queryResultSet); } - public static boolean isViewBuilt(String keyspaceName, String viewName) - { - String req = "SELECT view_name FROM %s.\"%s\" WHERE keyspace_name=? AND view_name=?"; - UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); - return !result.isEmpty(); - } - - public static void setMaterializedViewBuilt(String keyspaceName, String viewName) - { - String req = "INSERT INTO %s.\"%s\" (keyspace_name, view_name) VALUES (?, ?)"; - executeInternal(String.format(req, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); - forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); - } - - - public static void setMaterializedViewRemoved(String keyspaceName, String viewName) - { - String buildReq = "DELETE FROM %S.%s WHERE keyspace_name = ? AND view_name = ?"; - executeInternal(String.format(buildReq, NAME, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), keyspaceName, viewName); - forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS); - - String builtReq = "DELETE FROM %s.\"%s\" WHERE keyspace_name = ? AND view_name = ?"; - executeInternal(String.format(builtReq, NAME, BUILT_MATERIALIZEDVIEWS), keyspaceName, viewName); - forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); - } - - public static void beginMaterializedViewBuild(String ksname, String viewName, int generationNumber) - { - executeInternal(String.format("INSERT INTO system.%s (keyspace_name, view_name, generation_number) VALUES (?, ?, ?)", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), - ksname, - viewName, - generationNumber); - } - - public static void finishMaterializedViewBuildStatus(String ksname, String viewName) - { - // We flush the view built first, because if we fail now, we'll restart at the last place we checkpointed - // materialized view build. - // If we flush the delete first, we'll have to restart from the beginning. - // Also, if the build succeeded, but the materialized view build failed, we will be able to skip the - // materialized view build check next boot. - setMaterializedViewBuilt(ksname, viewName); - forceBlockingFlush(BUILT_MATERIALIZEDVIEWS); - executeInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ? AND view_name = ?", MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName); - forceBlockingFlush(MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS); - } - - public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token) - { - String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); - } - - public static Pair<Integer, Token> getMaterializedViewBuildStatus(String ksname, String viewName) - { - String req = "SELECT generation_number, last_token FROM system.%s WHERE keyspace_name = ? AND view_name = ?"; - UntypedResultSet queryResultSet = executeInternal(String.format(req, MATERIALIZEDVIEWS_BUILDS_IN_PROGRESS), ksname, viewName); - if (queryResultSet == null || queryResultSet.isEmpty()) - return null; - - UntypedResultSet.Row row = queryResultSet.one(); - - Integer generation = null; - Token lastKey = null; - if (row.has("generation_number")) - generation = row.getInt("generation_number"); - if (row.has("last_key")) - { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); - lastKey = factory.fromString(row.getString("last_key")); - } - - return Pair.create(generation, lastKey); - } - public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/WriteType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java index 20fb6a9..4f4c88d 100644 --- a/src/java/org/apache/cassandra/db/WriteType.java +++ b/src/java/org/apache/cassandra/db/WriteType.java @@ -24,6 +24,5 @@ public enum WriteType UNLOGGED_BATCH, COUNTER, BATCH_LOG, - CAS, - MATERIALIZED_VIEW; + CAS; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 3dd6f38..bf412d8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -58,7 +58,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.SecondaryIndexBuilder; -import org.apache.cassandra.db.view.MaterializedViewBuilder; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; @@ -1366,31 +1365,6 @@ public class CompactionManager implements CompactionManagerMBean } } - public Future<?> submitMaterializedViewBuilder(final MaterializedViewBuilder builder) - { - Runnable runnable = new Runnable() - { - public void run() - { - metrics.beginCompaction(builder); - try - { - builder.run(); - } - finally - { - metrics.finishCompaction(builder); - } - } - }; - if (executor.isShutdown()) - { - logger.info("Compaction executor has shut down, not submitting index build"); - return null; - } - - return executor.submit(runnable); - } public int getActiveCompactions() { return CompactionMetrics.getCompactions().size(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 5e15f33..766eb1b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -426,7 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer return tasks; } } - }, false, false); + }, false); } public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/compaction/OperationType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java index f8f016c..5b6ce05 100644 --- a/src/java/org/apache/cassandra/db/compaction/OperationType.java +++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java @@ -35,8 +35,7 @@ public enum OperationType VERIFY("Verify"), FLUSH("Flush"), STREAM("Stream"), - WRITE("Write"), - VIEW_BUILD("Materialized view build"); + WRITE("Write"); public final String type; public final String fileName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java deleted file mode 100644 index 082c71d..0000000 --- a/src/java/org/apache/cassandra/db/view/MaterializedView.java +++ /dev/null @@ -1,691 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.view; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import com.google.common.collect.Iterables; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.MaterializedViewDefinition; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.statements.CFProperties; -import org.apache.cassandra.db.AbstractReadCommandBuilder.SinglePartitionSliceBuilder; -import org.apache.cassandra.db.CBuilder; -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.LivenessInfo; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.ReadOrderGroup; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.Slice; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.BTreeBackedRow; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.ColumnData; -import org.apache.cassandra.db.rows.ComplexColumnData; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.RowIterator; -import org.apache.cassandra.service.pager.QueryPager; - -/** - * A Materialized View copies data from a base table into a view table which can be queried independently from the - * base. Every update which targets the base table must be fed through the {@link MaterializedViewManager} to ensure - * that if a view needs to be updated, the updates are properly created and fed into the view. - * - * This class does the job of translating the base row to the view row. - * - * It handles reading existing state and figuring out what tombstones need to be generated. - * - * createMutations below is the "main method" - * - */ -public class MaterializedView -{ - /** - * The columns should all be updated together, so we use this object as group. - */ - private static class MVColumns - { - //These are the base column definitions in terms of the *views* partitioning. - //Meaning we can see (for example) the partition key of the view contains a clustering key - //from the base table. - public final List<ColumnDefinition> partitionDefs; - public final List<ColumnDefinition> primaryKeyDefs; - public final List<ColumnDefinition> baseComplexColumns; - - private MVColumns(List<ColumnDefinition> partitionDefs, List<ColumnDefinition> primaryKeyDefs, List<ColumnDefinition> baseComplexColumns) - { - this.partitionDefs = partitionDefs; - this.primaryKeyDefs = primaryKeyDefs; - this.baseComplexColumns = baseComplexColumns; - } - } - - public final String name; - - private final ColumnFamilyStore baseCfs; - private ColumnFamilyStore _viewCfs = null; - - private MVColumns columns; - - private final boolean viewHasAllPrimaryKeys; - private final boolean includeAll; - private MaterializedViewBuilder builder; - - public MaterializedView(MaterializedViewDefinition definition, - ColumnFamilyStore baseCfs) - { - this.baseCfs = baseCfs; - - name = definition.viewName; - includeAll = definition.includeAll; - - viewHasAllPrimaryKeys = updateDefinition(definition); - } - - /** - * Lazily fetch the CFS instance for the view. - * We do this lazily to avoid initilization issues. - * - * @return The views CFS instance - */ - public ColumnFamilyStore getViewCfs() - { - if (_viewCfs == null) - _viewCfs = Keyspace.openAndGetStore(Schema.instance.getCFMetaData(baseCfs.keyspace.getName(), name)); - - return _viewCfs; - } - - - /** - * Lookup column definitions in the base table that correspond to the view columns (should be 1:1) - * - * Notify caller if all primary keys in the view are ALL primary keys in the base. We do this to simplify - * tombstone checks. - * - * @param columns a list of columns to lookup in the base table - * @param definitions lists to populate for the base table definitions - * @return true if all view PKs are also Base PKs - */ - private boolean resolveAndAddColumns(Iterable<ColumnIdentifier> columns, List<ColumnDefinition>... definitions) - { - boolean allArePrimaryKeys = true; - for (ColumnIdentifier identifier : columns) - { - ColumnDefinition cdef = baseCfs.metadata.getColumnDefinition(identifier); - assert cdef != null : "Could not resolve column " + identifier.toString(); - - for (List<ColumnDefinition> list : definitions) - { - list.add(cdef); - } - - allArePrimaryKeys = allArePrimaryKeys && cdef.isPrimaryKeyColumn(); - } - - return allArePrimaryKeys; - } - - /** - * This updates the columns stored which are dependent on the base CFMetaData. - * - * @return true if the view contains only columns which are part of the base's primary key; false if there is at - * least one column which is not. - */ - public boolean updateDefinition(MaterializedViewDefinition definition) - { - List<ColumnDefinition> partitionDefs = new ArrayList<>(definition.partitionColumns.size()); - List<ColumnDefinition> primaryKeyDefs = new ArrayList<>(definition.partitionColumns.size() - + definition.clusteringColumns.size()); - List<ColumnDefinition> baseComplexColumns = new ArrayList<>(); - - // We only add the partition columns to the partitions list, but both partition columns and clustering - // columns are added to the primary keys list - boolean partitionAllPrimaryKeyColumns = resolveAndAddColumns(definition.partitionColumns, primaryKeyDefs, partitionDefs); - boolean clusteringAllPrimaryKeyColumns = resolveAndAddColumns(definition.clusteringColumns, primaryKeyDefs); - - for (ColumnDefinition cdef : baseCfs.metadata.allColumns()) - { - if (cdef.isComplex()) - { - baseComplexColumns.add(cdef); - } - } - - this.columns = new MVColumns(partitionDefs, primaryKeyDefs, baseComplexColumns); - - return partitionAllPrimaryKeyColumns && clusteringAllPrimaryKeyColumns; - } - - /** - * Check to see if the update could possibly modify a view. Cases where the view may be updated are: - * <ul> - * <li>View selects all columns</li> - * <li>Update contains any range tombstones</li> - * <li>Update touches one of the columns included in the view</li> - * </ul> - * - * If the update contains any range tombstones, there is a possibility that it will not touch a range that is - * currently included in the view. - * - * @return true if {@param partition} modifies a column included in the view - */ - public boolean updateAffectsView(AbstractThreadUnsafePartition partition) - { - // If we are including all of the columns, then any update will be included - if (includeAll) - return true; - - // If there are range tombstones, tombstones will also need to be generated for the materialized view - // This requires a query of the base rows and generating tombstones for all of those values - if (!partition.deletionInfo().isLive()) - return true; - - // Check whether the update touches any of the columns included in the view - for (Row row : partition) - { - for (ColumnData data : row) - { - if (getViewCfs().metadata.getColumnDefinition(data.column().name) != null) - return true; - } - } - - return false; - } - - /** - * Creates the clustering columns for the view based on the specified row and resolver policy - * - * @param temporalRow The current row - * @param resolver The policy to use when selecting versions of cells use - * @return The clustering object to use for the view - */ - private Clustering viewClustering(TemporalRow temporalRow, TemporalRow.Resolver resolver) - { - CFMetaData viewCfm = getViewCfs().metadata; - int numViewClustering = viewCfm.clusteringColumns().size(); - CBuilder clustering = CBuilder.create(getViewCfs().getComparator()); - for (int i = 0; i < numViewClustering; i++) - { - ColumnDefinition definition = viewCfm.clusteringColumns().get(i); - clustering.add(temporalRow.clusteringValue(definition, resolver)); - } - - return clustering.build(); - } - - /** - * @return Mutation containing a range tombstone for a base partition key and TemporalRow. - */ - private PartitionUpdate createTombstone(TemporalRow temporalRow, - DecoratedKey partitionKey, - DeletionTime deletionTime, - TemporalRow.Resolver resolver, - int nowInSec) - { - CFMetaData viewCfm = getViewCfs().metadata; - Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); - builder.newRow(viewClustering(temporalRow, resolver)); - builder.addRowDeletion(deletionTime); - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); - } - - /** - * @return PartitionUpdate containing a complex tombstone for a TemporalRow, and the collection's column identifier. - */ - private PartitionUpdate createComplexTombstone(TemporalRow temporalRow, - DecoratedKey partitionKey, - ColumnDefinition deletedColumn, - DeletionTime deletionTime, - TemporalRow.Resolver resolver, - int nowInSec) - { - - CFMetaData viewCfm = getViewCfs().metadata; - Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); - builder.newRow(viewClustering(temporalRow, resolver)); - builder.addComplexDeletion(deletedColumn, deletionTime); - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); - } - - /** - * @return View's DecoratedKey or null, if one of the view's primary key components has an invalid resolution from - * the TemporalRow and its Resolver - */ - private DecoratedKey viewPartitionKey(TemporalRow temporalRow, TemporalRow.Resolver resolver) - { - List<ColumnDefinition> partitionDefs = this.columns.partitionDefs; - Object[] partitionKey = new Object[partitionDefs.size()]; - - for (int i = 0; i < partitionKey.length; i++) - { - ByteBuffer value = temporalRow.clusteringValue(partitionDefs.get(i), resolver); - - if (value == null) - return null; - - partitionKey[i] = value; - } - - return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata - .getKeyValidatorAsClusteringComparator() - .make(partitionKey))); - } - - /** - * @return mutation which contains the tombstone for the referenced TemporalRow, or null if not necessary. - * TemporalRow's can reference at most one view row; there will be at most one row to be tombstoned, so only one - * mutation is necessary - */ - private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow) - { - // Primary Key and Clustering columns do not generate tombstones - if (viewHasAllPrimaryKeys) - return null; - - boolean hasUpdate = false; - List<ColumnDefinition> primaryKeyDefs = this.columns.primaryKeyDefs; - for (ColumnDefinition viewPartitionKeys : primaryKeyDefs) - { - if (!viewPartitionKeys.isPrimaryKeyColumn() && temporalRow.clusteringValue(viewPartitionKeys, TemporalRow.oldValueIfUpdated) != null) - hasUpdate = true; - } - - if (!hasUpdate) - return null; - - TemporalRow.Resolver resolver = TemporalRow.earliest; - return createTombstone(temporalRow, - viewPartitionKey(temporalRow, resolver), - new DeletionTime(temporalRow.viewClusteringTimestamp(), temporalRow.nowInSec), - resolver, - temporalRow.nowInSec); - } - - /** - * @return Mutation which is the transformed base table mutation for the materialized view. - */ - private PartitionUpdate createUpdatesForInserts(TemporalRow temporalRow) - { - TemporalRow.Resolver resolver = TemporalRow.latest; - - DecoratedKey partitionKey = viewPartitionKey(temporalRow, resolver); - ColumnFamilyStore viewCfs = getViewCfs(); - - if (partitionKey == null) - { - // Not having a partition key means we aren't updating anything - return null; - } - - Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec); - - CBuilder clustering = CBuilder.create(viewCfs.getComparator()); - for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++) - { - clustering.add(temporalRow.clusteringValue(viewCfs.metadata.clusteringColumns().get(i), resolver)); - } - regularBuilder.newRow(clustering.build()); - regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(viewCfs.metadata, - temporalRow.viewClusteringTimestamp(), - temporalRow.viewClusteringTtl(), - temporalRow.viewClusteringLocalDeletionTime())); - - for (ColumnDefinition columnDefinition : viewCfs.metadata.allColumns()) - { - if (columnDefinition.isPrimaryKeyColumn()) - continue; - - for (Cell cell : temporalRow.values(columnDefinition, resolver)) - { - regularBuilder.addCell(cell); - } - } - - return PartitionUpdate.singleRowUpdate(viewCfs.metadata, partitionKey, regularBuilder.build()); - } - - /** - * @param partition Update which possibly contains deletion info for which to generate view tombstones. - * @return View Tombstones which delete all of the rows which have been removed from the base table with - * {@param partition} - */ - private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition) - { - final TemporalRow.Resolver resolver = TemporalRow.earliest; - - DeletionInfo deletionInfo = partition.deletionInfo(); - - List<Mutation> mutations = new ArrayList<>(); - - // Check the complex columns to see if there are any which may have tombstones we need to create for the view - if (!columns.baseComplexColumns.isEmpty()) - { - for (Row row : partition) - { - if (!row.hasComplexDeletion()) - continue; - - TemporalRow temporalRow = rowSet.getClustering(row.clustering()); - - assert temporalRow != null; - - for (ColumnDefinition definition : columns.baseComplexColumns) - { - ComplexColumnData columnData = row.getComplexColumnData(definition); - - if (columnData != null) - { - DeletionTime time = columnData.complexDeletion(); - if (!time.isLive()) - { - DecoratedKey targetKey = viewPartitionKey(temporalRow, resolver); - if (targetKey != null) - mutations.add(new Mutation(createComplexTombstone(temporalRow, targetKey, definition, time, resolver, temporalRow.nowInSec))); - } - } - } - } - } - - ReadCommand command = null; - - if (!deletionInfo.isLive()) - { - // We have to generate tombstones for all of the affected rows, but we don't have the information in order - // to create them. This requires that we perform a read for the entire range that is being tombstoned, and - // generate a tombstone for each. This may be slow, because a single range tombstone can cover up to an - // entire partition of data which is not distributed on a single partition node. - DecoratedKey dk = rowSet.dk; - - if (deletionInfo.hasRanges()) - { - SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, dk); - Iterator<RangeTombstone> tombstones = deletionInfo.rangeIterator(false); - while (tombstones.hasNext()) - { - RangeTombstone tombstone = tombstones.next(); - - builder.addSlice(tombstone.deletedSlice()); - } - - command = builder.build(); - } - else - { - command = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, rowSet.nowInSec, dk); - } - } - - if (command == null) - { - SinglePartitionSliceBuilder builder = null; - for (Row row : partition) - { - if (!row.deletion().isLive()) - { - if (builder == null) - builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); - builder.addSlice(Slice.make(row.clustering())); - } - } - - if (builder != null) - command = builder.build(); - } - - if (command != null) - { - QueryPager pager = command.getPager(null); - - // Add all of the rows which were recovered from the query to the row set - while (!pager.isExhausted()) - { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) - { - if (!iter.hasNext()) - break; - - try (RowIterator rowIterator = iter.next()) - { - while (rowIterator.hasNext()) - { - Row row = rowIterator.next(); - rowSet.addRow(row, false); - } - } - } - } - - // If the temporal row has been deleted by the deletion info, we generate the corresponding range tombstone - // for the view. - for (TemporalRow temporalRow : rowSet) - { - DeletionTime deletionTime = temporalRow.deletionTime(partition); - if (!deletionTime.isLive()) - { - DecoratedKey value = viewPartitionKey(temporalRow, resolver); - if (value != null) - { - PartitionUpdate update = createTombstone(temporalRow, value, deletionTime, resolver, temporalRow.nowInSec); - if (update != null) - mutations.add(new Mutation(update)); - } - } - } - } - - return !mutations.isEmpty() ? mutations : null; - } - - /** - * Read and update temporal rows in the set which have corresponding values stored on the local node - */ - private void readLocalRows(TemporalRow.Set rowSet) - { - SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk); - - for (TemporalRow temporalRow : rowSet) - builder.addSlice(temporalRow.baseSlice()); - - QueryPager pager = builder.build().getPager(null); - - while (!pager.isExhausted()) - { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator iter = pager.fetchPageInternal(128, orderGroup)) - { - while (iter.hasNext()) - { - try (RowIterator rows = iter.next()) - { - while (rows.hasNext()) - { - rowSet.addRow(rows.next(), false); - } - } - } - } - } - } - - /** - * @return Set of rows which are contained in the partition update {@param partition} - */ - private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition) - { - Set<ColumnIdentifier> columns = new HashSet<>(); - for (ColumnDefinition def : this.columns.primaryKeyDefs) - columns.add(def.name); - - TemporalRow.Set rowSet = new TemporalRow.Set(baseCfs, columns, key); - for (Row row : partition) - rowSet.addRow(row, true); - - return rowSet; - } - - /** - * @param isBuilding If the view is currently being built, we do not query the values which are already stored, - * since all of the update will already be present in the base table. - * @return View mutations which represent the changes necessary as long as previously created mutations for the view - * have been applied successfully. This is based solely on the changes that are necessary given the current - * state of the base table and the newly applying partition data. - */ - public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding) - { - if (!updateAffectsView(partition)) - return null; - - TemporalRow.Set rowSet = separateRows(key, partition); - - // If we are building the view, we do not want to add old values; they will always be the same - if (!isBuilding) - readLocalRows(rowSet); - - Collection<Mutation> mutations = null; - for (TemporalRow temporalRow : rowSet) - { - // If we are building, there is no need to check for partition tombstones; those values will not be present - // in the partition data - if (!isBuilding) - { - PartitionUpdate partitionTombstone = createRangeTombstoneForRow(temporalRow); - if (partitionTombstone != null) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.add(new Mutation(partitionTombstone)); - } - } - - PartitionUpdate insert = createUpdatesForInserts(temporalRow); - if (insert != null) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.add(new Mutation(insert)); - } - } - - if (!isBuilding) - { - Collection<Mutation> deletion = createForDeletionInfo(rowSet, partition); - if (deletion != null && !deletion.isEmpty()) - { - if (mutations == null) mutations = new LinkedList<>(); - mutations.addAll(deletion); - } - } - - return mutations; - } - - public synchronized void build() - { - if (this.builder != null) - { - this.builder.stop(); - this.builder = null; - } - - this.builder = new MaterializedViewBuilder(baseCfs, this); - CompactionManager.instance.submitMaterializedViewBuilder(builder); - } - - /** - * @return CFMetaData which represents the definition given - */ - public static CFMetaData getCFMetaData(MaterializedViewDefinition definition, - CFMetaData baseCf, - CFProperties properties) - { - CFMetaData.Builder viewBuilder = CFMetaData.Builder - .createView(baseCf.ksName, definition.viewName); - - ColumnDefinition nonPkTarget = null; - - for (ColumnIdentifier targetIdentifier : definition.partitionColumns) - { - ColumnDefinition target = baseCf.getColumnDefinition(targetIdentifier); - if (!target.isPartitionKey()) - nonPkTarget = target; - - viewBuilder.addPartitionKey(target.name, properties.getReversableType(targetIdentifier, target.type)); - } - - Collection<ColumnDefinition> included = new ArrayList<>(); - for(ColumnIdentifier identifier : definition.included) - { - ColumnDefinition cfDef = baseCf.getColumnDefinition(identifier); - assert cfDef != null; - included.add(cfDef); - } - - boolean includeAll = included.isEmpty(); - - for (ColumnIdentifier ident : definition.clusteringColumns) - { - ColumnDefinition column = baseCf.getColumnDefinition(ident); - viewBuilder.addClusteringColumn(ident, properties.getReversableType(ident, column.type)); - } - - for (ColumnDefinition column : baseCf.partitionColumns().regulars.columns) - { - if (column != nonPkTarget && (includeAll || included.contains(column))) - { - viewBuilder.addRegularColumn(column.name, column.type); - } - } - - //Add any extra clustering columns - for (ColumnDefinition column : Iterables.concat(baseCf.partitionKeyColumns(), baseCf.clusteringColumns())) - { - if ( (!definition.partitionColumns.contains(column.name) && !definition.clusteringColumns.contains(column.name)) && - (includeAll || included.contains(column)) ) - { - viewBuilder.addRegularColumn(column.name, column.type); - } - } - - CFMetaData cfm = viewBuilder.build(); - properties.properties.applyToCFMetadata(cfm); - - return cfm; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java deleted file mode 100644 index e8842ed..0000000 --- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.view; - -import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.concurrent.ScheduledExecutors; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.ReadOrderGroup; -import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.compaction.CompactionInfo; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.db.partitions.FilteredPartition; -import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.rows.RowIterator; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.concurrent.Refs; - -public class MaterializedViewBuilder extends CompactionInfo.Holder -{ - private final ColumnFamilyStore baseCfs; - private final MaterializedView view; - private final UUID compactionId; - private volatile Token prevToken = null; - - private static final Logger logger = LoggerFactory.getLogger(MaterializedViewBuilder.class); - - private volatile boolean isStopped = false; - - public MaterializedViewBuilder(ColumnFamilyStore baseCfs, MaterializedView view) - { - this.baseCfs = baseCfs; - this.view = view; - compactionId = UUIDGen.getTimeUUID(); - } - - private void buildKey(DecoratedKey key) - { - QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null); - - while (!pager.isExhausted()) - { - try (ReadOrderGroup orderGroup = pager.startOrderGroup(); - PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup)) - { - if (!partitionIterator.hasNext()) - return; - - try (RowIterator rowIterator = partitionIterator.next()) - { - Collection<Mutation> mutations = view.createMutations(key.getKey(), FilteredPartition.create(rowIterator), true); - - if (mutations != null) - { - try - { - StorageProxy.mutateMV(key.getKey(), mutations); - break; - } - catch (WriteTimeoutException ex) - { - NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES) - .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name); - } - } - } - } - } - } - - public void run() - { - String ksname = baseCfs.metadata.ksName, viewName = view.name; - - if (SystemKeyspace.isViewBuilt(ksname, viewName)) - return; - - Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName); - final Pair<Integer, Token> buildStatus = SystemKeyspace.getMaterializedViewBuildStatus(ksname, viewName); - Token lastToken; - Function<View, Iterable<SSTableReader>> function; - if (buildStatus == null) - { - baseCfs.forceBlockingFlush(); - function = View.select(SSTableSet.CANONICAL); - int generation = Integer.MIN_VALUE; - - try (Refs<SSTableReader> temp = baseCfs.selectAndReference(function).refs) - { - for (SSTableReader reader : temp) - { - generation = Math.max(reader.descriptor.generation, generation); - } - } - - SystemKeyspace.beginMaterializedViewBuild(ksname, viewName, generation); - lastToken = null; - } - else - { - function = new Function<View, Iterable<SSTableReader>>() - { - @Nullable - public Iterable<SSTableReader> apply(View view) - { - Iterable<SSTableReader> readers = View.select(SSTableSet.CANONICAL).apply(view); - if (readers != null) - return Iterables.filter(readers, ssTableReader -> ssTableReader.descriptor.generation <= buildStatus.left); - return null; - } - }; - lastToken = buildStatus.right; - } - - prevToken = lastToken; - try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs; - ReducingKeyIterator iter = new ReducingKeyIterator(sstables)) - { - while (!isStopped && iter.hasNext()) - { - DecoratedKey key = iter.next(); - Token token = key.getToken(); - if (lastToken == null || lastToken.compareTo(token) < 0) - { - for (Range<Token> range : ranges) - { - if (range.contains(token)) - { - buildKey(key); - - if (prevToken == null || prevToken.compareTo(token) != 0) - { - SystemKeyspace.updateMaterializedViewBuildStatus(ksname, viewName, key.getToken()); - prevToken = token; - } - } - } - lastToken = null; - } - } - - SystemKeyspace.finishMaterializedViewBuildStatus(ksname, viewName); - - } - catch (Exception e) - { - final MaterializedViewBuilder builder = new MaterializedViewBuilder(baseCfs, view); - ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitMaterializedViewBuilder(builder), - 5, - TimeUnit.MINUTES); - logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e); - } - } - - public CompactionInfo getCompactionInfo() - { - long rangesLeft = 0, rangesTotal = 0; - Token lastToken = prevToken; - - // This approximation is not very accurate, but since we do not have a method which allows us to calculate the - // percentage of a range covered by a second range, this is the best approximation that we can calculate. - // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of - // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node - // has. - for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName())) - { - rangesLeft++; - rangesTotal++; - // This will reset rangesLeft, so that the number of ranges left will be less than the total ranges at the - // end of the method. - if (lastToken == null || range.contains(lastToken)) - rangesLeft = 0; - } - return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesLeft, rangesTotal, "ranges", compactionId); - } - - public void stop() - { - isStopped = true; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java deleted file mode 100644 index 7f97728..0000000 --- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.view; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.locks.Lock; - -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Striped; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.MaterializedViewDefinition; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.IMutation; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.OverloadedException; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; - -/** - * Manages {@link MaterializedView}'s for a single {@link ColumnFamilyStore}. All of the materialized views for that - * table are created when this manager is initialized. - * - * The main purposes of the manager are to provide a single location for updates to be vetted to see whether they update - * any views {@link MaterializedViewManager#updateAffectsView(PartitionUpdate)}, provide locks to prevent multiple - * updates from creating incoherent updates in the view {@link MaterializedViewManager#acquireLockFor(ByteBuffer)}, and - * to affect change on the view. - */ -public class MaterializedViewManager -{ - private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024); - - private final ConcurrentNavigableMap<String, MaterializedView> viewsByName; - - private final ColumnFamilyStore baseCfs; - - public MaterializedViewManager(ColumnFamilyStore baseCfs) - { - this.viewsByName = new ConcurrentSkipListMap<>(); - - this.baseCfs = baseCfs; - } - - public Iterable<MaterializedView> allViews() - { - return viewsByName.values(); - } - - public Iterable<ColumnFamilyStore> allViewsCfs() - { - List<ColumnFamilyStore> viewColumnFamilies = new ArrayList<>(); - for (MaterializedView view : allViews()) - viewColumnFamilies.add(view.getViewCfs()); - return viewColumnFamilies; - } - - public void init() - { - reload(); - } - - public void invalidate() - { - for (MaterializedView view : allViews()) - removeMaterializedView(view.name); - } - - public void reload() - { - Map<String, MaterializedViewDefinition> newViewsByName = new HashMap<>(); - for (MaterializedViewDefinition definition : baseCfs.metadata.getMaterializedViews()) - { - newViewsByName.put(definition.viewName, definition); - } - - for (String viewName : viewsByName.keySet()) - { - if (!newViewsByName.containsKey(viewName)) - removeMaterializedView(viewName); - } - - for (Map.Entry<String, MaterializedViewDefinition> entry : newViewsByName.entrySet()) - { - if (!viewsByName.containsKey(entry.getKey())) - addMaterializedView(entry.getValue()); - } - - for (MaterializedView view : allViews()) - { - view.build(); - // We provide the new definition from the base metadata - view.updateDefinition(newViewsByName.get(view.name)); - } - } - - public void buildAllViews() - { - for (MaterializedView view : allViews()) - view.build(); - } - - public void removeMaterializedView(String name) - { - MaterializedView view = viewsByName.remove(name); - - if (view == null) - return; - - SystemKeyspace.setMaterializedViewRemoved(baseCfs.metadata.ksName, view.name); - } - - public void addMaterializedView(MaterializedViewDefinition definition) - { - MaterializedView view = new MaterializedView(definition, baseCfs); - - viewsByName.put(definition.viewName, view); - } - - /** - * Calculates and pushes updates to the views replicas. The replicas are determined by - * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}. - */ - public void pushViewReplicaUpdates(ByteBuffer key, PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException - { - // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the - // view node. - if (!StorageService.instance.isJoined()) return; - - List<Mutation> mutations = null; - for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet()) - { - Collection<Mutation> viewMutations = view.getValue().createMutations(key, update, false); - if (viewMutations != null && !viewMutations.isEmpty()) - { - if (mutations == null) - mutations = Lists.newLinkedList(); - mutations.addAll(viewMutations); - } - } - if (mutations != null) - { - StorageProxy.mutateMV(key, mutations); - } - } - - public boolean updateAffectsView(PartitionUpdate upd) - { - for (MaterializedView view : allViews()) - { - if (view.updateAffectsView(upd)) - return true; - } - return false; - } - - public static Lock acquireLockFor(ByteBuffer key) - { - Lock lock = LOCKS.get(key); - - if (lock.tryLock()) - return lock; - - return null; - } - - public static boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean ignoreRf1) - { - for (IMutation mutation : mutations) - { - for (PartitionUpdate cf : mutation.getPartitionUpdates()) - { - Keyspace keyspace = Keyspace.open(cf.metadata().ksName); - - if (ignoreRf1 && keyspace.getReplicationStrategy().getReplicationFactor() == 1) - continue; - - MaterializedViewManager viewManager = keyspace.getColumnFamilyStore(cf.metadata().cfId).materializedViewManager; - if (viewManager.updateAffectsView(cf)) - return true; - } - } - - return false; - } - - - public void forceBlockingFlush() - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - viewCfs.forceBlockingFlush(); - } - - public void dumpMemtables() - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - viewCfs.dumpMemtable(); - } - - public void truncateBlocking(long truncatedAt) - { - for (ColumnFamilyStore viewCfs : allViewsCfs()) - { - ReplayPosition replayAfter = viewCfs.discardSSTables(truncatedAt); - SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); - } - } -}
