Author: gdusbabek
Date: Mon Mar 8 15:40:44 2010
New Revision: 920360
URL: http://svn.apache.org/viewvc?rev=920360&view=rev
Log:
CASSANDRA-845 add status property to streaming mbean and change several DEBUG
messages to INFO. Patch by Gary Dusbabek, reviewed by Jonathan Ellis.
Modified:
incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingService.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
Modified: incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=920360&r1=920359&r2=920360&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.6/CHANGES.txt Mon Mar 8 15:40:44
2010
@@ -13,6 +13,8 @@
* Add logging of GC activity (CASSANDRA-813)
* fix ConcurrentModificationException in commitlog discard (CASSANDRA-853)
* Fix hardcoded row count in Hadoop RecordReader (CASSANDRA-837)
+ * Add a jmx status to the streaming service and change several DEBUG
+ messages to INFO (CASSANDRA-845)
0.6.0-beta1/beta2
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=920360&r1=920359&r2=920360&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Mon Mar 8 15:40:44 2010
@@ -33,13 +33,12 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamOutManager;
+
/**
* This class handles streaming data from one node to another.
@@ -59,6 +58,13 @@
private static Logger logger = Logger.getLogger(StreamOut.class);
static String TABLE_NAME = "STREAMING-TABLE-NAME";
+
+ private static void updateStatus(String msg)
+ {
+ StreamingService.instance.setStatus(msg);
+ if (logger.isInfoEnabled() && !StreamingService.NOTHING.equals(msg))
+ logger.info(msg);
+ }
/**
* Split out files for all tables on disk locally for each range and then
stream them to the target endpoint.
@@ -67,8 +73,7 @@
{
assert ranges.size() > 0;
- if (logger.isDebugEnabled())
- logger.debug("Beginning transfer process to " + target + " for
ranges " + StringUtils.join(ranges, ", "));
+ logger.debug("Beginning transfer process to " + target + " for ranges
" + StringUtils.join(ranges, ", "));
/*
* (1) dump all the memtables to disk.
@@ -78,8 +83,7 @@
try
{
Table table = Table.open(tableName);
- if (logger.isDebugEnabled())
- logger.debug("Flushing memtables ...");
+ updateStatus("Flushing memtables for " + tableName + "...");
for (Future f : table.flush())
{
try
@@ -95,8 +99,7 @@
throw new RuntimeException(e);
}
}
- if (logger.isDebugEnabled())
- logger.debug("Performing anticompaction ...");
+ updateStatus("Performing anticompaction ...");
/* Get the list of files that need to be streamed */
transferSSTables(target, table.forceAntiCompaction(ranges,
target), tableName); // SSTR GC deletes the file when done
}
@@ -104,6 +107,10 @@
{
throw new IOError(e);
}
+ finally
+ {
+ StreamingService.instance.setStatus(StreamingService.NOTHING);
+ }
if (callback != null)
callback.run();
}
@@ -125,23 +132,21 @@
}
}
if (logger.isDebugEnabled())
- logger.debug("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
-
+ logger.debug("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(pendingFiles);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
- if (logger.isDebugEnabled())
- logger.debug("Sending a stream initiate message to " + target + "
...");
+ updateStatus("Sending a stream initiate message to " + target + "
...");
MessagingService.instance.sendOneWay(message, target);
if (pendingFiles.length > 0)
{
- logger.info("Waiting for transfer to " + target + " to complete");
+ StreamingService.instance.setStatus("Waiting for transfer to " +
target + " to complete");
StreamOutManager.get(target).waitForStreamCompletion();
// todo: it would be good if there were a dafe way to remove the
StreamManager for target.
// (StreamManager will delete the streamed file on completion.)
- logger.info("Done with transfer to " + target);
+ updateStatus("Done with transfer to " + target);
}
}
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=920360&r1=920359&r2=920360&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingService.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingService.java
Mon Mar 8 15:40:44 2010
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming;
-import org.apache.cassandra.streaming.StreamingServiceMBean;
import org.apache.log4j.Logger;
import javax.management.MBeanServer;
@@ -35,6 +34,8 @@
private static final Logger logger =
Logger.getLogger(StreamingService.class);
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.streaming:type=StreamingService";
public static final StreamingService instance = new StreamingService();
+ static final String NOTHING = "Nothing is happening";
+ private String status = NOTHING;
private StreamingService()
{
@@ -48,6 +49,17 @@
throw new RuntimeException(e);
}
}
+
+ public void setStatus(String s)
+ {
+ assert s != null;
+ status = s;
+ }
+
+ public String getStatus()
+ {
+ return status;
+ }
/** hosts receiving outgoing streams. */
public Set<InetAddress> getStreamDestinations()
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java?rev=920360&r1=920359&r2=920360&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
Mon Mar 8 15:40:44 2010
@@ -36,4 +36,7 @@
/** details about incoming streams */
public List<String> getIncomingFiles(String host) throws IOException;
+
+ /** What's currently happening wrt streaming. */
+ public String getStatus();
}