Author: [email protected]
Date: Tue Nov  1 09:06:40 2011
New Revision: 1694

Log:
[AMDATUCASSANDRA-126] Fixed proper Cassandra shutdown

Modified:
   
branches/amdatu-cassandra-0.2.1/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java

Modified: 
branches/amdatu-cassandra-0.2.1/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
--- 
branches/amdatu-cassandra-0.2.1/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
      (original)
+++ 
branches/amdatu-cassandra-0.2.1/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
      Tue Nov  1 09:06:40 2011
@@ -15,16 +15,24 @@
  */
 package org.amdatu.cassandra.application.service;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.amdatu.cassandra.application.CassandraConfigurationService;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.felix.dm.DependencyManager;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -121,8 +129,25 @@
             ThreadPoolExecutor mutationStage = 
StageManager.getStage(Stage.MUTATION);
             if (!mutationStage.isShutdown()) {
                 mutationStage.shutdown();
-                mutationStage.awaitTermination(1, TimeUnit.SECONDS);
-                CommitLog.instance.shutdownBlocking();
+                mutationStage.awaitTermination(30, TimeUnit.SECONDS);
+                CommitLog.instance.shutdownBlocking();
+                
+                List<Future<?>> flushes = new ArrayList<Future<?>>();
+                for (Table table : Table.all())
+                {
+                    KSMetaData ksm = 
DatabaseDescriptor.getKSMetaData(table.name);
+                    if (!ksm.isDurableWrites())
+                    {
+                        for (ColumnFamilyStore cfs : 
table.getColumnFamilyStores())
+                        {
+                            Future<?> future = cfs.forceFlush();
+                            if (future != null)
+                                flushes.add(future);
+                        }
+                    }
+                }
+                FBUtilities.waitOnFutures(flushes);
+                m_logService.log(LogService.LOG_INFO, "Cassandra Daemon 
shutdown completed.");
             }
         }
         catch (InterruptedException e) {
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to