[ 
https://issues.apache.org/jira/browse/CASSANDRA-11670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15506775#comment-15506775
 ] 

Paulo Motta edited comment on CASSANDRA-11670 at 9/20/16 2:49 PM:
------------------------------------------------------------------

I started implementing the new batchlog table modelling as discussed 
previously, and modifying the batchlog write and replay path accordingly to add 
and retrieve mutations to the batchlog table iteratively without caching them 
in memory. But in order to define the batch gcgs, all mutations need to be 
iterated, so there are 3 options here:
1. Calculate the gcgs on the write path, storing them as a batch attribute.
2. Calculate the gcgs on the replay path as done currently, by holding all 
mutations in memory and calculating the minimum gcgs of all mutations.
3. Calculate the gsgs on the replay path by iterating on the batch mutations 
once to retrieve the min gcgs, and again to replay the batches.

1 would require a change of the batch wire format, which cannot be done before 
4.0. 2 would probably move the current size limitation from disk to memory, 
what could be much more dangerous. 3 would not be very efficient since for 
every batch (or at least for bigger batches) you would need to iterated on it 
twice during replay.

I think we could do 3 while waiting for 1 to be feasible, but while 
investigating an alternative to this, I verified that the refactor of 
CASSANDRA-11475 batches multiple mutations for the same partition key in a 
single mutation, what raises the max limit of mutations per MV batch from a few 
hundred (100) to a few thousand rows per partition key (3000), what already 
mitigates this problem considerably for ordinary MV updates.

For MV updates originated from streaming, I noticed that we apply these 
mutations unsafely (skipping the commit log), since we flush before completing 
the stream session, but we don't propagate {{writeCommitlog=false}} flag to 
{{pushViewReplicaUpdates}} due to the addition of the {{isClReplay}} flag by 
CASSANDRA-10164, so we only consider the latter flag. So, a very simple fix to 
avoid this problem during streaming (where it's most likely to happen) is to 
simply propagate the  {{writeCommitlog=false}} flag to 
{{pushViewReplicaUpdates}} on {{Keyspace.apply}} by changing the original 
condition from {{!isClReplay}} to {{writeCommitLog && !isClReplay}}.

I added regression dtests with very wide partitions and submitted a [pull 
request|https://github.com/riptano/cassandra-dtest/pull/1331]

In order to increase further the maximum number of MV updates in a single batch 
for ordinary updates, I propose we do the approach 1 above as an improvement 
after changing the messaging service version.

Setting this to patch available. Below is the patch and tests with the proposed 
fix:
||3.0||trunk||dtest||
||[branch|https://github.com/apache/cassandra/compare/cassandra-3.0...pauloricardomg:3.0-11670]||[branch|https://github.com/apache/cassandra/compare/trunk...pauloricardomg:trunk-11670]||[branch|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670]||
||[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-testall/lastCompletedBuild/testReport/]||[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-trunk-11670-testall/lastCompletedBuild/testReport/]||
||[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-dtest/lastCompletedBuild/testReport/]||[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-trunk-11670-dtest/lastCompletedBuild/testReport/]||



was (Author: pauloricardomg):
I started implementing the new batchlog table modelling as discussed 
previously, and modifying the batchlog write and replay path accordingly to add 
and retrieve mutations to the batchlog table iteratively without caching them 
in memory. But in order to define the batch gcgs, all mutations need to be 
iterated, so there are 3 options here:
1. Calculate the gcgs on the write path, storing them as a batch attribute.
2. Calculate the gcgs on the replay path as done currently, by holding all 
mutations in memory and calculating the minimum gcgs of all mutations.
3. Calculate the gsgs on the replay path by iterating on the batch mutations 
once to retrieve the min gcgs, and again to replay the batches.

1 would require a change of the batch wire format, which cannot be done before 
4.0. 2 would probably move the current size limitation from disk to memory, 
what could be much more dangerous. 3 would not be very efficient since for 
every batch (or at least for bigger batches) you would need to iterated on it 
twice during replay.

I think we could do 3 while waiting for 1 to be feasible, but while 
investigating an alternative to this, I verified that the refactor of 
CASSANDRA-11475 batches multiple mutations for the same partition key in a 
single mutation, what raises the max limit of mutations per MV batch from a few 
hundred (100) to a few thousand rows per partition key (3000), what already 
mitigates this problem considerably for ordinary MV updates.

For MV updates originated from streaming, I noticed that we apply these 
mutations unsafely (skipping the commit log), since we flush before completing 
the stream session, but we don't propagate {{writeCommitlog=false}} flag to 
{{pushViewReplicaUpdates}} due to the addition of the {{isClReplay}} flag by 
CASSANDRA-10164, so we only consider the latter flag. So, a very simple fix to 
avoid this problem during streaming (where it's most likely to happen) is to 
simply propagate the  {{writeCommitlog=false}} flag to 
{{pushViewReplicaUpdates}} on {{Keyspace.apply}} by changing the original 
condition from {{!isClReplay}} to {{writeCommitLog && !isClReplay}}.

I added regression dtests with very wide partitions and submitted a [pull 
request|https://github.com/riptano/cassandra-dtest/pull/1331]

In order to increase further the maximum number of MV updates in a single batch 
for ordinary updates, I propose we do the approach 1 below as an improvement 
after changing the messaging service version.

Setting this to patch available. Below is the patch and tests with the proposed 
fix:
||3.0||trunk||dtest||
||[branch|https://github.com/apache/cassandra/compare/cassandra-3.0...pauloricardomg:3.0-11670]||[branch|https://github.com/apache/cassandra/compare/trunk...pauloricardomg:trunk-11670]||[branch|https://github.com/riptano/cassandra-dtest/compare/master...pauloricardomg:11670]||
||[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-testall/lastCompletedBuild/testReport/]||[testall|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-trunk-11670-testall/lastCompletedBuild/testReport/]||
||[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-3.0-11670-dtest/lastCompletedBuild/testReport/]||[dtest|http://cassci.datastax.com/view/Dev/view/paulomotta/job/pauloricardomg-trunk-11670-dtest/lastCompletedBuild/testReport/]||


> Rebuilding or streaming MV generates mutations larger than 
> max_mutation_size_in_kb
> ----------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-11670
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-11670
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Configuration, Streaming and Messaging
>            Reporter: Anastasia Osintseva
>            Assignee: Paulo Motta
>             Fix For: 3.0.x
>
>
> I have in cluster 2 DC, in each DC - 2 Nodes. I wanted to add 1 node to each 
> DC. One node has been added successfully after I had made scrubing. 
> Now I'm trying to add node to another DC, but get error: 
> org.apache.cassandra.streaming.StreamException: Stream failed. 
> After scrubing and repair I get the same error.  
> {noformat}
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 Keyspace.java:492 - 
> Unknown exception caught while attempting to update MaterializedView! 
> messages_dump.messages
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> StreamReceiveTask.java:214 - Error applying streamed data: 
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> ERROR [StreamReceiveTask:5] 2016-04-27 00:33:21,082 StreamSession.java:520 - 
> [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] Streaming error occurred
> java.lang.IllegalArgumentException: Mutation of 34974901 bytes is too large 
> for the maxiumum size of 33554432
>       at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:264) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:469) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.batchlog.BatchlogManager.store(BatchlogManager.java:146) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageProxy.mutateMV(StorageProxy.java:724) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.db.view.ViewManager.pushViewReplicaUpdates(ViewManager.java:149)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:487) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:384) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyFuture(Mutation.java:205) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.apply(Mutation.java:217) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at org.apache.cassandra.db.Mutation.applyUnsafe(Mutation.java:236) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:169)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11]
> DEBUG [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> ConnectionHandler.java:110 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Closing stream connection handler on /88.9.99.92
> DEBUG [STREAM-OUT-/88.9.99.92] 2016-04-27 00:33:21,082 
> ConnectionHandler.java:341 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Sending Session Failed
> INFO  [StreamReceiveTask:5] 2016-04-27 00:33:21,082 
> StreamResultFuture.java:182 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Session with /88.9.99.92 is complete
> WARN  [StreamReceiveTask:5] 2016-04-27 00:33:21,182 
> StreamResultFuture.java:209 - [Stream #f849ffe0-0bee-11e6-9b5f-d16a1b9764ab] 
> Stream failed
> ERROR [main] 2016-04-27 00:33:21,259 StorageService.java:1300 - Error while 
> waiting on bootstrap to complete. Bootstrap will have to be restarted.
> java.util.concurrent.ExecutionException: 
> org.apache.cassandra.streaming.StreamException: Stream failed
>       at 
> com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) 
> ~[guava-18.0.jar:na]
>       at 
> org.apache.cassandra.service.StorageService.bootstrap(StorageService.java:1295)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.joinTokenRing(StorageService.java:971)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.initServer(StorageService.java:745)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.StorageService.initServer(StorageService.java:610)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:333) 
> [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:551)
>  [apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:679) 
> [apache-cassandra-3.0.5.jar:3.0.5]
> Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
>       at 
> org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at com.google.common.util.concurrent.Futures$6.run(Futures.java:1310) 
> ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
>  ~[guava-18.0.jar:na]
>       at 
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
>  ~[guava-18.0.jar:na]
>       at 
> org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:210)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:186)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:430)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamSession.onError(StreamSession.java:525) 
> ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> org.apache.cassandra.streaming.StreamReceiveTask$OnCompletionRunnable.run(StreamReceiveTask.java:216)
>  ~[apache-cassandra-3.0.5.jar:3.0.5]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_11]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> ~[na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  ~[na:1.8.0_11]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ~[na:1.8.0_11]
>       at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_11]
> {noformat}
> I set commitlog_segment_size_in_mb: 128, but it didn't help. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to