Author: [email protected]
Date: Tue Nov 1 09:07:25 2011
New Revision: 1695
Log:
[AMDATUCASSANDRA-126] Merged fix from 0.2.1 branch
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
Tue Nov 1 09:07:25 2011
@@ -15,16 +15,24 @@
*/
package org.amdatu.cassandra.application.service;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.amdatu.cassandra.application.CassandraConfigurationService;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.felix.dm.DependencyManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -121,8 +129,25 @@
ThreadPoolExecutor mutationStage =
StageManager.getStage(Stage.MUTATION);
if (!mutationStage.isShutdown()) {
mutationStage.shutdown();
- mutationStage.awaitTermination(1, TimeUnit.SECONDS);
- CommitLog.instance.shutdownBlocking();
+ mutationStage.awaitTermination(30, TimeUnit.SECONDS);
+ CommitLog.instance.shutdownBlocking();
+
+ List<Future<?>> flushes = new ArrayList<Future<?>>();
+ for (Table table : Table.all())
+ {
+ KSMetaData ksm =
DatabaseDescriptor.getKSMetaData(table.name);
+ if (!ksm.isDurableWrites())
+ {
+ for (ColumnFamilyStore cfs :
table.getColumnFamilyStores())
+ {
+ Future<?> future = cfs.forceFlush();
+ if (future != null)
+ flushes.add(future);
+ }
+ }
+ }
+ FBUtilities.waitOnFutures(flushes);
+ m_logService.log(LogService.LOG_INFO, "Cassandra Daemon
shutdown completed.");
}
}
catch (InterruptedException e) {
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits