Author: [email protected]
Date: Fri Feb 17 11:06:59 2012
New Revision: 2097
Log:
[AMDATUCASSANDRA-164] Changed start/stop methods to synchronized and removed
the separate Thread (which was not really necessary). This should avoid
concurrency issues with the DM.
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
Fri Feb 17 11:06:59 2012
@@ -15,6 +15,9 @@
*/
package org.amdatu.cassandra.client.service;
+import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.HOSTS;
+import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.PORT;
+
import org.amdatu.cassandra.client.CassandraDaemonAvailable;
import org.apache.cassandra.thrift.Cassandra;
@@ -27,8 +30,6 @@
import org.apache.thrift.transport.TTransportException;
import org.osgi.service.log.LogService;
-import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.*;
-
/**
* This service is responsible for registration of a CassandraDaemonAvailable
service in case
* a successful Thrift connection could be made with the daemon and its
operating mode is
@@ -50,84 +51,77 @@
private volatile LogService m_logService;
private volatile CassandraClientConfigurationService m_configuration;
- private CassandraDaemonListenerThread m_activatorThread = null;
private Component m_serviceAvailableComponent = null;
- public void start() {
- // Start a new daemon listener thread. It will register a
CassandraDaemonAvailableService
- // as soon as the daemon reached operation mode 'Normal' and a Thrift
connection could be
- // established
- m_activatorThread = new CassandraDaemonListenerThread();
- m_activatorThread.start();
+ public synchronized void start() {
+ // Test the Thrift connection and register a
CassandraDaemonAvailableService
+ // as soon as a Thrift connection could be established
+ registerAvailableServiec();
}
- public void stop() {
- // Interrupt the listener thread, if it is still running
- if (m_activatorThread != null && m_activatorThread.isAlive()) {
- m_activatorThread.interrupt();
- }
-
- // Unregister the CassandraDaemonAvailabkle service component
+ public synchronized void stop() {
+ // Unregister the CassandraDaemonAvailable service component
if (m_serviceAvailableComponent != null) {
m_dependencyManager.remove(m_serviceAvailableComponent);
}
}
- class CassandraDaemonListenerThread extends Thread {
- public void run() {
- try {
- // First wait until we can establish a Thrift connection to
the daemon, the connection is
- // established as soon as the daemon is running.
- testThriftConnection();
-
- // Register a new CassandraDaemonAvailable service
- CassandraDaemonAvailable service = new
CassandraDaemonAvailable() {
- };
-
- m_serviceAvailableComponent =
m_dependencyManager.createComponent()
- .setInterface(CassandraDaemonAvailable.class.getName(),
null)
- .setImplementation(service);
- m_dependencyManager.add(m_serviceAvailableComponent);
- }
- catch (TTransportException e) {
- m_logService.log(LogService.LOG_ERROR, "Could not establish
Thrift connection to Cassandra daemon, "
- + "daemon could not be started.");
- }
+ private void registerAvailableServiec() {
+ try {
+ // First wait until we can establish a Thrift connection to the
daemon, the connection is
+ // established as soon as the daemon is running.
+ testThriftConnection();
+
+ // Register a new CassandraDaemonAvailable service
+ CassandraDaemonAvailable service = new CassandraDaemonAvailable() {
+ };
+
+ m_serviceAvailableComponent = m_dependencyManager.createComponent()
+
.setInterface(CassandraDaemonAvailable.class.getName(), null)
+ .setImplementation(service);
+ m_dependencyManager.add(m_serviceAvailableComponent);
+ }
+ catch (TTransportException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not establish Thrift
connection to Cassandra daemon, "
+ + "daemon could not be started.");
}
+ }
- /**
- * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
- * has been established. This is useful to ensure that the daemon is
running before continuing.
- *
- * @throws TTransportException
- */
- private void testThriftConnection() throws TTransportException {
- int retry = 0;
+ /**
+ * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
+ * has been established. This is useful to ensure that the daemon is
running before continuing.
+ *
+ * @throws TTransportException
+ */
+ private void testThriftConnection() throws TTransportException {
+ int retry = 0;
+ try {
+ String thrift =
+ m_configuration.get(HOSTS, String.class) + ":" +
m_configuration.get(PORT, Integer.class);
+ m_logService.log(LogService.LOG_INFO, "Establishing Thrift
connection to the Cassandra Daemon on "
+ + thrift);
+ TTransport tr =
+ new TSocket(m_configuration.get(HOSTS,
String.class), m_configuration.get(PORT, Integer.class),
+ THRIFT_TIMEOUT);
+ TProtocol proto = new TBinaryProtocol(tr);
+ new Cassandra.Client(proto);
+ tr.open();
+ }
+ catch (TTransportException e) {
+ retry++;
+ if (retry >= THRIFT_RETRIES) {
+ throw e;
+ }
try {
- String thrift = m_configuration.get(HOSTS, String.class) + ":"
+ m_configuration.get(PORT, Integer.class);
- m_logService.log(LogService.LOG_INFO, "Establishing Thrift
connection to the Cassandra Daemon on "
- + thrift);
- TTransport tr = new TSocket(m_configuration.get(HOSTS,
String.class), m_configuration.get(PORT, Integer.class),
- THRIFT_TIMEOUT);
- TProtocol proto = new TBinaryProtocol(tr);
- new Cassandra.Client(proto);
- tr.open();
+ Thread.sleep(THRIFT_TIMEOUT);
}
- catch (TTransportException e) {
- retry++;
- if (retry >= THRIFT_RETRIES) {
- throw e;
- }
- try {
- Thread.sleep(THRIFT_TIMEOUT);
- }
- catch (InterruptedException e1) {
- m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift
connection interrupted");
- }
- m_logService.log(LogService.LOG_INFO, "Thrift connection
cannot yet be established, retrying... ("
- + retry + ")");
+ catch (InterruptedException e1) {
+ m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift
connection interrupted");
}
- m_logService.log(LogService.LOG_INFO, "Thrift connection
established successfully");
+ m_logService.log(LogService.LOG_INFO, "Thrift connection cannot
yet be established, retrying... ("
+ + retry + ")");
}
+ m_logService.log(LogService.LOG_INFO, "Thrift connection established
successfully");
}
+
}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits