[ 
https://issues.apache.org/jira/browse/CASSANDRA-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14216261#comment-14216261
 ] 

Rajanarayanan Thottuvaikkatumana commented on CASSANDRA-7124:
-------------------------------------------------------------

[~yukim], Thanks for the suggestions. Here is the implementation strategy I 
have chosen for the cleanup operation after looking at the repair job example. 
1) From the CompactionManager started returning ListenableFuture (all are 
separate methods to avoid regression)
2) In the StorageService, implemented async methods that sends JMX messages 
3) Here is the flow of control from StorageService.java In the call chain, 
wherever possible, new async versions have been added without touching the 
existing methods

Please have a look at it and if you think that this is the right thing to do, I 
can upload the patch.
{code}
    public int forceKeyspaceASyncCleanup(String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
    {
        if (keyspaceName.equals(SystemKeyspace.NAME))
            throw new RuntimeException("Cleanup of the system keyspace is 
neither necessary nor wise");

        int cmd = nextRepairCommand.incrementAndGet();
        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
        {
                new Thread(createCleanupTask(cmd, keyspaceName, 
cfStore)).start();
                
        }
        return cmd;
    }
    
    
    private FutureTask<Object> createCleanupTask(final int cmd, final String 
keyspace, final ColumnFamilyStore cfStore)
    {
        return new FutureTask<>(new WrappedRunnable()
        {
            protected void runMayThrow() throws Exception
            {
                Iterable<SSTableReader> compactingSSTables = 
cfStore.markAllCompacting();
                if (compactingSSTables == null){
                        logger.info("Aborting operation on {}.{} after failing 
to interrupt other compaction operations", cfStore.keyspace.getName(), 
cfStore.name);
                        return;
                }
                if (Iterables.isEmpty(compactingSSTables))
                {
                    logger.info("No sstables for {}.{}", 
cfStore.keyspace.getName(), cfStore.name);
                    return;
                }               
                String message = String.format("Starting cleanup command #%d, 
cleaning up keyspace %s with column family store %s", cmd, keyspace, 
cfStore.name);
                logger.info(message);
                sendNotification("cleanup", message, new int[]{cmd, 
ActiveRepairService.Status.STARTED.ordinal()});
                                List<ListenableFuture<Object>> futures = 
cfStore.forceAsyncCleanup();
                                for(final ListenableFuture<Object> future: 
futures)
                                {
                                        Futures.addCallback(future, new 
FutureCallback<Object>()
                                        {
                                          public void onFailure(Throwable 
thrown) 
                                          {
                                                String message = "Failed 
cleanup job " + future.toString() + "with exception: " + thrown.getMessage();
                                                logger.info(message);  
                                            sendNotification("cleanup", 
message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                                          }     
                                          public void onSuccess(Object future) 
                                          {     
                                                String message = "Cleanup 
Session: " + future.toString() ;
                                                logger.info(message);
                                                sendNotification("cleanup", 
message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()});
                                          }                                     
          
                                        });
                                        future.get();
                                }
                                
cfStore.getDataTracker().unmarkCompacting(compactingSSTables);
                                message = String.format("Ending cleanup command 
#%d, cleaning up keyspace %s with column family store %s", cmd, keyspace, 
cfStore.name);
                logger.info(message);
                sendNotification("cleanup", message, new int[]{cmd, 
ActiveRepairService.Status.FINISHED.ordinal()});                            
            }
        },null);
    }

{code}

> Use JMX Notifications to Indicate Success/Failure of Long-Running Operations
> ----------------------------------------------------------------------------
>
>                 Key: CASSANDRA-7124
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-7124
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Tools
>            Reporter: Tyler Hobbs
>            Assignee: Rajanarayanan Thottuvaikkatumana
>            Priority: Minor
>              Labels: lhf
>             Fix For: 3.0
>
>         Attachments: cassandra-trunk-temp-7124.txt
>
>
> If {{nodetool cleanup}} or some other long-running operation takes too long 
> to complete, you'll see an error like the one in CASSANDRA-2126, so you can't 
> tell if the operation completed successfully or not.  CASSANDRA-4767 fixed 
> this for repairs with JMX notifications.  We should do something similar for 
> nodetool cleanup, compact, decommission, move, relocate, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to