[ 
https://issues.apache.org/jira/browse/CASSANDRA-12008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15392776#comment-15392776
 ] 

Kaide Mu edited comment on CASSANDRA-12008 at 7/27/16 9:54 AM:
---------------------------------------------------------------

Here's 3 operations I'm proposing for STREAMED_RANGES:
- updateStreamedRanges(String description, InetAddress endpoint, String 
keyspace, Set<Range<Token>> range)
- Set<Range<Token>> getAvailableRanges(String keyspace, InetAdress newReplica, 
IPartitioner partitioner)
- void resetAvailableRanges()

Briefly the flow is:
# StreamPlan.transferRanges(InetAddress to, InetAddress connecting, String 
keyspace, Collection<Range<Token>> ranges)
# StreamSession.addTransferRanges(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
# StreamSession.addTransferFiles(Collection<SSTableStreamingSections> 
sstableDetails)
# And finally StreamTransferTask.complete which will make use of 
{{updateStreamedRanges}}

As you can see {{updateStreamedRanges}} will need description, endpoint, 
keyspace and range, and when we reached 4, we can only obtain description and 
endpoint from {{StreamTransferTask.session}}, therefore some information is 
"lost" and {{updateStreamedRanges}} cannot be accomplished. Thus additionally I 
propose {{StreamTransferTask}} to receive keyspace and range, for that we may 
need to create a new {{addTransferFiles}} to keep a record of keyspace and 
transfer ranges.

WDYT? [~pauloricardomg] [~yukim]




was (Author: kdmu):
Here's 3 operations I'm proposing for STREAMED_RANGES:
- updateStreamedRanges(String description, InetAddress endpoint, String 
keyspace, Set<Range<Token>> range)
- MultiMap<Range<Token>> getAvailableRanges(String keyspace, IPartitioner 
partitioner)
- void resetAvailableRanges()

Briefly the flow is:
# StreamPlan.transferRanges(InetAddress to, InetAddress connecting, String 
keyspace, Collection<Range<Token>> ranges)
# StreamSession.addTransferRanges(String keyspace, Collection<Range<Token>> 
ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
# StreamSession.addTransferFiles(Collection<SSTableStreamingSections> 
sstableDetails)
# And finally StreamTransferTask.complete which will make use of 
{{updateStreamedRanges}}

As you can see {{updateStreamedRanges}} will need description, endpoint, 
keyspace and range, and when we reached 4, we can only obtain description and 
endpoint from {{StreamTransferTask.session}}, therefore some information is 
"lost" and {{updateStreamedRanges}} cannot be accomplished. Thus additionally I 
propose {{StreamTransferTask}} to receive keyspace and range, for that we may 
need to create a new {{addTransferFiles}} to keep a record of keyspace and 
transfer ranges.

WDYT? [~pauloricardomg] [~yukim]



> Make decommission operations resumable
> --------------------------------------
>
>                 Key: CASSANDRA-12008
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-12008
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Streaming and Messaging
>            Reporter: Tom van der Woerdt
>            Assignee: Kaide Mu
>            Priority: Minor
>
> We're dealing with large data sets (multiple terabytes per node) and 
> sometimes we need to add or remove nodes. These operations are very dependent 
> on the entire cluster being up, so while we're joining a new node (which 
> sometimes takes 6 hours or longer) a lot can go wrong and in a lot of cases 
> something does.
> It would be great if the ability to retry streams was implemented.
> Example to illustrate the problem :
> {code}
> 03:18 PM   ~ $ nodetool decommission
> error: Stream failed
> -- StackTrace --
> org.apache.cassandra.streaming.StreamException: Stream failed
>         at 
> org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
>         at com.google.common.util.concurrent.Futures$6.run(Futures.java:1310)
>         at 
> com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
>         at 
> com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
>         at 
> com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
>         at 
> com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
>         at 
> org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:210)
>         at 
> org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:186)
>         at 
> org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:430)
>         at 
> org.apache.cassandra.streaming.StreamSession.complete(StreamSession.java:622)
>         at 
> org.apache.cassandra.streaming.StreamSession.messageReceived(StreamSession.java:486)
>         at 
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:274)
>         at java.lang.Thread.run(Thread.java:745)
> 08:04 PM   ~ $ nodetool decommission
> nodetool: Unsupported operation: Node in LEAVING state; wait for status to 
> become normal or restart
> See 'nodetool help' or 'nodetool help <command>'.
> {code}
> Streaming failed, probably due to load :
> {code}
> ERROR [STREAM-IN-/<ipaddr>] 2016-06-14 18:05:47,275 StreamSession.java:520 - 
> [Stream #<streamid>] Streaming error occurred
> java.net.SocketTimeoutException: null
>         at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) 
> ~[na:1.8.0_77]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[na:1.8.0_77]
>         at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[na:1.8.0_77]
>         at 
> org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
>  ~[apache-cassandra-3.0.6.jar:3.0.6]
>         at 
> org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:268)
>  ~[apache-cassandra-3.0.6.jar:3.0.6]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
> {code}
> If implementing retries is not possible, can we have a 'nodetool decommission 
> resume'?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to