Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 9c0262529 -> b58686858
Re-apply MV updates on commitlog replay patch by tjake; reviewed by carlyeks for CASSANDRA-10164 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5868685 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5868685 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5868685 Branch: refs/heads/cassandra-3.0 Commit: b58686858c632ed642ccf355f1f3a588e28b0e8a Parents: 9c02625 Author: T Jake Luciani <[email protected]> Authored: Thu Aug 27 13:28:04 2015 -0400 Committer: T Jake Luciani <[email protected]> Committed: Tue Sep 1 15:30:20 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 29 +++++++++++------ .../db/commitlog/CommitLogReplayer.java | 13 +++++++- .../db/view/MaterializedViewBuilder.java | 13 +------- .../db/view/MaterializedViewManager.java | 8 ++--- .../apache/cassandra/service/StorageProxy.java | 33 +++++++------------- .../cassandra/streaming/StreamReceiveTask.java | 8 +++-- .../cassandra/cql3/MaterializedViewTest.java | 26 +++++++++++++++ 8 files changed, 79 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6539792..88b99a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta2 + * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164) * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901) * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554) * Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index f5a047f..981209c 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -386,7 +386,17 @@ public class Keyspace public void apply(Mutation mutation, boolean writeCommitLog) { - apply(mutation, writeCommitLog, true); + apply(mutation, writeCommitLog, true, false); + } + + public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) + { + apply(mutation, writeCommitLog, updateIndexes, false); + } + + public void applyFromCommitLog(Mutation mutation) + { + apply(mutation, false, true, true); } /** @@ -396,8 +406,9 @@ public class Keyspace * may happen concurrently, depending on the CL Executor type. * @param writeCommitLog false to disable commitlog append entirely * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") + * @param isClReplay true if caller is the commitlog replayer */ - public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes) + public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes, boolean isClReplay) { if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); @@ -456,15 +467,15 @@ public class Keyspace { try { - Tracing.trace("Create materialized view mutations from replica"); - cfs.materializedViewManager.pushViewReplicaUpdates(upd); + Tracing.trace("Creating materialized view mutations from base table replica"); + cfs.materializedViewManager.pushViewReplicaUpdates(upd, !isClReplay); } - catch (Exception e) + catch (Throwable t) { - if (!(e instanceof WriteTimeoutException)) - logger.warn("Encountered exception when creating materialized view mutations", e); - - JVMStabilityInspector.inspectThrowable(e); + JVMStabilityInspector.inspectThrowable(t); + logger.error(String.format("Unknown exception caught while attempting to update MaterializedView! %s.%s", + upd.metadata().ksName, upd.metadata().cfName), t); + throw t; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 93c3026..4f50008 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -161,8 +161,19 @@ public class CommitLogReplayer // flush replayed keyspaces futures.clear(); + boolean flushingSystem = false; for (Keyspace keyspace : keyspacesRecovered) + { + if (keyspace.getName().equals(SystemKeyspace.NAME)) + flushingSystem = true; + futures.addAll(keyspace.flush()); + } + + // also flush batchlog incase of any MV updates + if (!flushingSystem) + futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush()); + FBUtilities.waitOnFutures(futures); return replayedCount.get(); } @@ -594,7 +605,7 @@ public class CommitLogReplayer if (newMutation != null) { assert !newMutation.isEmpty(); - Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); + Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation); keyspacesRecovered.add(keyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java index 6083634..e23fd84 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedViewBuilder.java @@ -95,18 +95,7 @@ public class MaterializedViewBuilder extends CompactionInfo.Holder Collection<Mutation> mutations = view.createMutations(partition, temporalRows, true); if (mutations != null) - { - try - { - StorageProxy.mutateMV(key.getKey(), mutations); - break; - } - catch (WriteTimeoutException ex) - { - NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES) - .warn("Encountered write timeout when building materialized view {}, the entries were stored in the batchlog and will be replayed at another time", view.name); - } - } + StorageProxy.mutateMV(key.getKey(), mutations, true); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java index e0cecf5..ac6a256 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedViewManager.java @@ -151,12 +151,8 @@ public class MaterializedViewManager * Calculates and pushes updates to the views replicas. The replicas are determined by * {@link MaterializedViewUtils#getViewNaturalEndpoint(String, Token, Token)}. */ - public void pushViewReplicaUpdates(PartitionUpdate update) throws UnavailableException, OverloadedException, WriteTimeoutException + public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog) { - // This happens when we are replaying from commitlog. In that case, we have already sent this commit off to the - // view node. - if (!StorageService.instance.isJoined()) return; - List<Mutation> mutations = null; TemporalRow.Set temporalRows = null; for (Map.Entry<String, MaterializedView> view : viewsByName.entrySet()) @@ -174,7 +170,7 @@ public class MaterializedViewManager } if (mutations != null) { - StorageProxy.mutateMV(update.partitionKey().getKey(), mutations); + StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 25789bb..4952959 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -652,8 +652,7 @@ public class StorageProxy implements StorageProxyMBean * * @param mutations the mutations to be applied across the replicas */ - public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations) - throws UnavailableException, OverloadedException, WriteTimeoutException + public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog) { Tracing.trace("Determining replicas for mutation"); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -693,7 +692,10 @@ public class StorageProxy implements StorageProxyMBean if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty()) { - mutation.apply(); + if (writeCommitLog) + mutation.apply(); + else + mutation.applyUnsafe(); } else { @@ -703,31 +705,18 @@ public class StorageProxy implements StorageProxyMBean if (!wrappers.isEmpty()) { + Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version); + //Apply to local batchlog memtable in this thread - BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version).apply(); + if (writeCommitLog) + blMutation.apply(); + else + blMutation.applyUnsafe(); // now actually perform the writes and wait for them to complete asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION); } } - catch (WriteTimeoutException ex) - { - mvWriteMetrics.timeouts.mark(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - throw ex; - } - catch (UnavailableException e) - { - mvWriteMetrics.unavailables.mark(); - Tracing.trace("Unavailable"); - throw e; - } - catch (OverloadedException e) - { - mvWriteMetrics.unavailables.mark(); - Tracing.trace("Overloaded"); - throw e; - } finally { mvWriteMetrics.addNano(System.nanoTime() - startTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 52c8884..cb99654 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -146,7 +146,7 @@ public class StreamReceiveTask extends StreamTask //We have a special path for Materialized view. //Since the MV requires cleaning up any pre-existing state, we must put //all partitions through the same write path as normal mutations. - //This also ensures any 2is are also updated + //This also ensures any 2i's are also updated if (hasMaterializedViews) { for (SSTableReader reader : readers) @@ -157,7 +157,8 @@ public class StreamReceiveTask extends StreamTask { try (UnfilteredRowIterator rowIterator = scanner.next()) { - new Mutation(PartitionUpdate.fromIterator(rowIterator)).apply(); + //Apply unsafe (we will flush below before transaction is done) + new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); } } } @@ -183,7 +184,10 @@ public class StreamReceiveTask extends StreamTask //We don't keep the streamed sstables since we've applied them manually //So we abort the txn and delete the streamed sstables if (hasMaterializedViews) + { + cfs.forceBlockingFlush(); task.txn.abort(); + } } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5868685/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java index daa68e9..7d08a8b 100644 --- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java @@ -292,6 +292,32 @@ public class MaterializedViewTest extends CQLTester } @Test + public void testBuilderWidePartition() throws Throwable + { + createTable("CREATE TABLE %s (" + + "k int, " + + "c int, " + + "intval int, " + + "PRIMARY KEY (k, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + + for(int i = 0; i < 1024; i++) + execute("INSERT INTO %s (k, c, intval) VALUES (?, ?, ?)", 0, i, 0); + + createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL AND intval IS NOT NULL PRIMARY KEY (intval, c, k)"); + + + while (!SystemKeyspace.isViewBuilt(keyspace(), "mv")) + Thread.sleep(1000); + + assertRows(execute("SELECT count(*) from %s WHERE k = ?", 0), row(1024L)); + assertRows(execute("SELECT count(*) from mv WHERE intval = ?", 0), row(1024L)); + } + + @Test public void testRangeTombstone() throws Throwable { createTable("CREATE TABLE %s (" +
