Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 d5f2d0f07 -> 76f175028
  refs/heads/cassandra-3.X f1b742e9d -> 541d83707
  refs/heads/trunk cd728d2e7 -> 9f75e7068


Split materialized view mutations on build to prevent OOM

Patch by Carl Yeksigian; reviewed by Jake Luciani for CASSANDRA-12268


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76f17502
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76f17502
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76f17502

Branch: refs/heads/cassandra-3.0
Commit: 76f175028544fe20f30ae873f23cba559097cef1
Parents: d5f2d0f
Author: Carl Yeksigian <c...@apache.org>
Authored: Wed Oct 12 12:24:19 2016 -0400
Committer: Carl Yeksigian <c...@apache.org>
Committed: Wed Oct 12 12:24:19 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/view/TableViews.java    | 91 +++++++++++++++++---
 .../apache/cassandra/db/view/ViewBuilder.java   | 14 +--
 .../cassandra/db/view/ViewUpdateGenerator.java  |  8 ++
 4 files changed, 95 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d797288..13800da 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.10
+ * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
  * mx4j does not work in 3.0.8 (CASSANDRA-12274)
  * Abort cqlsh copy-from in case of no answer after prolonged period of time 
(CASSANDRA-12740)
  * Avoid sstable corrupt exception due to dropped static column 
(CASSANDRA-12582)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java 
b/src/java/org/apache/cassandra/db/view/TableViews.java
index 7feb67c..1a3cbb1 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -46,7 +46,8 @@ public class TableViews extends AbstractCollection<View>
     private final CFMetaData baseTableMetadata;
 
     // We need this to be thread-safe, but the number of times this is changed 
(when a view is created in the keyspace)
-    // massively exceeds the number of time it's read (for every mutation on 
the keyspace), so a copy-on-write list is the best option.
+    // is massively exceeded by the number of times it's read (for every 
mutation on the keyspace), so a copy-on-write
+    // list is the best option.
     private final List<View> views = new CopyOnWriteArrayList();
 
     public TableViews(CFMetaData baseTableMetadata)
@@ -137,7 +138,7 @@ public class TableViews extends AbstractCollection<View>
              UnfilteredRowIterator existings = 
UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), 
command);
              UnfilteredRowIterator updates = update.unfilteredIterator())
         {
-            mutations = generateViewUpdates(views, updates, existings, 
nowInSec);
+            mutations = Iterators.getOnlyElement(generateViewUpdates(views, 
updates, existings, nowInSec, false));
         }
         
Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime()
 - start, TimeUnit.NANOSECONDS);
 
@@ -145,6 +146,7 @@ public class TableViews extends AbstractCollection<View>
             StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, 
writeCommitLog, baseComplete);
     }
 
+
     /**
      * Given some updates on the base table of this object and the existing 
values for the rows affected by that update, generates the
      * mutation to be applied to the provided views.
@@ -159,7 +161,11 @@ public class TableViews extends AbstractCollection<View>
      * @param nowInSec the current time in seconds.
      * @return the mutations to apply to the {@code views}. This can be empty.
      */
-    public Collection<Mutation> generateViewUpdates(Collection<View> views, 
UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec)
+    public Iterator<Collection<Mutation>> generateViewUpdates(Collection<View> 
views,
+                                                              
UnfilteredRowIterator updates,
+                                                              
UnfilteredRowIterator existings,
+                                                              int nowInSec,
+                                                              boolean 
separateUpdates)
     {
         assert updates.metadata().cfId.equals(baseTableMetadata.cfId);
 
@@ -251,18 +257,75 @@ public class TableViews extends AbstractCollection<View>
                 addToViewUpdateGenerators(existingRow, 
emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), 
generators, nowInSec);
             }
         }
-        while (updatesIter.hasNext())
+
+        if (separateUpdates)
         {
-            Unfiltered update = updatesIter.next();
-            // If it's a range tombstone, it removes nothing pre-exisiting, so 
we can ignore it for view updates
-            if (update.isRangeTombstoneMarker())
-                continue;
+            final Collection<Mutation> firstBuild = 
buildMutations(baseTableMetadata, generators);
+
+            return new Iterator<Collection<Mutation>>()
+            {
+                // If the previous values are already empty, this update must 
be either empty or exclusively appending.
+                // In the case we are exclusively appending, we need to drop 
the build that was passed in and try to build a
+                // new first update instead.
+                // If there are no other updates, next will be null and the 
iterator will be empty.
+                Collection<Mutation> next = firstBuild.isEmpty()
+                                            ? buildNext()
+                                            : firstBuild;
+
+                private Collection<Mutation> buildNext()
+                {
+                    while (updatesIter.hasNext())
+                    {
+                        Unfiltered update = updatesIter.next();
+                        // If it's a range tombstone, it removes nothing 
pre-exisiting, so we can ignore it for view updates
+                        if (update.isRangeTombstoneMarker())
+                            continue;
+
+                        Row updateRow = (Row) update;
+                        
addToViewUpdateGenerators(emptyRow(updateRow.clustering(), DeletionTime.LIVE), 
updateRow, generators, nowInSec);
+
+                        // If the updates have been filtered, then we won't 
have any mutations; we need to make sure that we
+                        // only return if the mutations are empty. Otherwise, 
we continue to search for an update which is
+                        // not filtered
+                        Collection<Mutation> mutations = 
buildMutations(baseTableMetadata, generators);
+                        if (!mutations.isEmpty())
+                            return mutations;
+                    }
+
+                    return null;
+                }
+
+                public boolean hasNext()
+                {
+                    return next != null;
+                }
+
+                public Collection<Mutation> next()
+                {
+                    Collection<Mutation> mutations = next;
+
+                    next = buildNext();
 
-            Row updateRow = (Row)update;
-            addToViewUpdateGenerators(emptyRow(updateRow.clustering(), 
DeletionTime.LIVE), updateRow, generators, nowInSec);
+                    assert !mutations.isEmpty() : "Expected mutations to be 
non-empty";
+                    return mutations;
+                }
+            };
         }
+        else
+        {
+            while (updatesIter.hasNext())
+            {
+                Unfiltered update = updatesIter.next();
+                // If it's a range tombstone, it removes nothing 
pre-exisiting, so we can ignore it for view updates
+                if (update.isRangeTombstoneMarker())
+                    continue;
+
+                Row updateRow = (Row) update;
+                addToViewUpdateGenerators(emptyRow(updateRow.clustering(), 
DeletionTime.LIVE), updateRow, generators, nowInSec);
+            }
 
-        return buildMutations(baseTableMetadata, generators);
+            return 
Iterators.singletonIterator(buildMutations(baseTableMetadata, generators));
+        }
     }
 
     /**
@@ -425,10 +488,13 @@ public class TableViews extends AbstractCollection<View>
         // One view is probably common enough and we can optimize a bit easily
         if (generators.size() == 1)
         {
-            Collection<PartitionUpdate> updates = 
generators.get(0).generateViewUpdates();
+            ViewUpdateGenerator generator = generators.get(0);
+            Collection<PartitionUpdate> updates = 
generator.generateViewUpdates();
             List<Mutation> mutations = new ArrayList<>(updates.size());
             for (PartitionUpdate update : updates)
                 mutations.add(new Mutation(update));
+
+            generator.clear();
             return mutations;
         }
 
@@ -446,6 +512,7 @@ public class TableViews extends AbstractCollection<View>
                 }
                 mutation.add(update);
             }
+            generator.clear();
         }
         return mutations.values();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java 
b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index b55eda0..37c0e7b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.view;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -83,15 +84,15 @@ public class ViewBuilder extends CompactionInfo.Holder
         // and pretend that there is nothing pre-existing.
         UnfilteredRowIterator empty = 
UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, 
Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);
 
-        Collection<Mutation> mutations;
         try (ReadOrderGroup orderGroup = command.startOrderGroup();
              UnfilteredRowIterator data = 
UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), 
command))
         {
-            mutations = 
baseCfs.keyspace.viewManager.forTable(baseCfs.metadata).generateViewUpdates(Collections.singleton(view),
 data, empty, nowInSec);
-        }
+            Iterator<Collection<Mutation>> mutations = 
baseCfs.keyspace.viewManager
+                                                      
.forTable(baseCfs.metadata)
+                                                      
.generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);
 
-        if (!mutations.isEmpty())
-            StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
+            mutations.forEachRemaining(m -> 
StorageProxy.mutateMV(key.getKey(), m, true, noBase));
+        }
     }
 
     public void run()
@@ -166,8 +167,7 @@ public class ViewBuilder extends CompactionInfo.Holder
             }
 
             if (!isStopped)
-            SystemKeyspace.finishViewBuildStatus(ksname, viewName);
-
+                SystemKeyspace.finishViewBuildStatus(ksname, viewName);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76f17502/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java 
b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 3bdc380..edb88d0 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -145,6 +145,14 @@ public class ViewUpdateGenerator
     }
 
     /**
+     * Clears the current state so that the generator may be reused.
+     */
+    public void clear()
+    {
+        updates.clear();
+    }
+
+    /**
      * Compute which type of action needs to be performed to the view for a 
base table row
      * before and after an update.
      */

Reply via email to