Author: gdusbabek
Date: Wed Feb 10 22:16:14 2010
New Revision: 908682
URL: http://svn.apache.org/viewvc?rev=908682&view=rev
Log:
jmx interface for tracking streams (incoming and outgoing) and a general
operation-mode. Patch by Gary Dusbabek, reviewed by Jonathan Ellis.
CASSANDRA-709
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Feb 10 22:16:14 2010
@@ -23,6 +23,8 @@
* track latency in microseconds (CASSANDRA-733)
* add describe_ Thrift methods, deprecating get_string_property and
get_string_list_property
+ * jmx interface for tracking operation mode and streams in general.
+ (CASSANDRA-709)
0.5.1
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
Wed Feb 10 22:16:14 2010
@@ -25,6 +25,7 @@
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
+import org.apache.cassandra.streaming.StreamOutManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;
@@ -95,6 +96,7 @@
if (logger.isDebugEnabled())
logger.debug("Bytes transferred " + bytesTransferred);
start += bytesTransferred;
+ StreamOutManager.get(to).update(file, start);
}
}
finally
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Feb 10 22:16:14 2010
@@ -145,6 +145,7 @@
/* when intialized as a client, we shouldn't write to the system table. */
private boolean isClientMode;
private boolean initialized;
+ private String operationMode;
public void addBootstrapSource(InetAddress s, String table)
{
@@ -175,6 +176,7 @@
setToken(getLocalToken());
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(getLocalToken())));
logger_.info("Bootstrap/move completed! Now serving reads.");
+ setMode("Normal", false);
}
/** This method updates the local token on disk */
@@ -228,6 +230,10 @@
replicationStrategies.put(table, strat);
}
replicationStrategies =
Collections.unmodifiableMap(replicationStrategies);
+
+ // spin up the streaming serivice so it is available for jmx tools.
+ if (StreamingService.instance == null)
+ throw new RuntimeException("Streaming service is unavailable.");
}
public AbstractReplicationStrategy getReplicationStrategy(String table)
@@ -279,6 +285,7 @@
MessagingService.instance.listen(FBUtilities.getLocalAddress());
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(),
(int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
+ setMode("Client", false);
}
public synchronized void initServer() throws IOException
@@ -313,14 +320,16 @@
if (DatabaseDescriptor.isAutoBootstrap()
&&
!(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) ||
SystemTable.isBootstrapped()))
{
- logger_.info("Starting in bootstrap mode");
+ setMode("Joining: getting load information", true);
StorageLoadBalancer.instance.waitForLoadInfo();
- logger_.info("... got load info");
+ if (logger_.isDebugEnabled())
+ logger_.debug("... got load info");
if (tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
{
String s = "This node is already a member of the token ring;
bootstrap aborted. (If replacing a dead node, remove the old one from the ring
first.)";
throw new UnsupportedOperationException(s);
}
+ setMode("Joining: getting bootstrap token", true);
Token token = BootStrapper.getBootstrapToken(tokenMetadata_,
StorageLoadBalancer.instance.getLoadInfo());
startBootstrap(token);
// don't finish startup (enabling thrift) until after bootstrap is
done
@@ -342,17 +351,25 @@
Token token = storageMetadata_.getToken();
tokenMetadata_.updateNormalToken(token,
FBUtilities.getLocalAddress());
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(token)));
+ setMode("Normal", false);
}
assert tokenMetadata_.sortedTokens().size() > 0;
}
+ private void setMode(String m, boolean log)
+ {
+ operationMode = m;
+ if (log)
+ logger_.info(m);
+ }
+
private void startBootstrap(Token token) throws IOException
{
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us
part of the ring locally which is incorrect until we are done bootstrapping
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_BOOTSTRAPPING + Delimiter +
partitioner_.getTokenFactory().toString(token)));
- logger_.info("bootstrap sleeping " + RING_DELAY);
+ setMode("Joining: sleeping " + RING_DELAY + " for pending range
setup", true);
try
{
Thread.sleep(RING_DELAY);
@@ -361,6 +378,7 @@
{
throw new AssertionError(e);
}
+ setMode("Bootstrapping", true);
new BootStrapper(FBUtilities.getLocalAddress(), token,
tokenMetadata_).startBootstrap(); // handles token update
}
@@ -1271,10 +1289,10 @@
throw new UnsupportedOperationException("data is currently
moving to this node; unable to leave the ring");
}
- // leave the ring
- logger_.info("DECOMMISSIONING");
+ if (logger_.isDebugEnabled())
+ logger_.debug("DECOMMISSIONING");
startLeaving();
- logger_.info("decommission sleeping " + RING_DELAY);
+ setMode("Leaving: sleeping " + RING_DELAY + " for pending range
setup", true);
Thread.sleep(RING_DELAY);
Runnable finishLeaving = new Runnable()
@@ -1283,7 +1301,7 @@
{
Gossiper.instance.stop();
MessagingService.shutdown();
- logger_.info("DECOMMISSION FINISHED.");
+ setMode("Decommissioned", true);
// let op be responsible for killing the process
}
};
@@ -1296,8 +1314,6 @@
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
calculatePendingRanges();
- if (logger_.isDebugEnabled())
- logger_.debug("");
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter +
getLocalToken().toString()));
try
{
@@ -1323,6 +1339,7 @@
continue;
}
+ setMode("Leaving: streaming data to other nodes", true);
final Set<Map.Entry<Range, InetAddress>> pending =
Collections.synchronizedSet(new HashSet<Map.Entry<Range,
InetAddress>>(rangesMM.entries()));
for (final Map.Entry<Range, InetAddress> entry :
rangesMM.entries())
{
@@ -1388,10 +1405,10 @@
if (token != null && tokenMetadata_.sortedTokens().contains(token))
throw new IOException("target token " + token + " is already owned
by another node");
- // leave the ring
- logger_.info("starting move. leaving token " + getLocalToken());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Leaving: old token was " + getLocalToken());
startLeaving();
- logger_.info("move sleeping " + RING_DELAY);
+ setMode("Leaving: sleeping " + RING_DELAY + " for pending range
setup", true);
Thread.sleep(RING_DELAY);
Runnable finishMoving = new WrappedRunnable()
@@ -1481,6 +1498,11 @@
return false;
}
+ public String getOperationMode()
+ {
+ return operationMode;
+ }
+
// Never ever do this at home. Used by tests.
Map<String, AbstractReplicationStrategy>
setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy>
replacement)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Wed Feb 10 22:16:14 2010
@@ -157,4 +157,7 @@
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
+
+ /** get the operational mode (leaving, joining, normal, decommissioned,
client) **/
+ public String getOperationMode();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed Feb 10 22:16:14 2010
@@ -1,94 +1,101 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.net.InetSocketAddress;
-import java.net.InetAddress;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.io.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.FileStreamTask;
-
-public class IncomingStreamReader
-{
- private static Logger logger =
Logger.getLogger(IncomingStreamReader.class);
- private PendingFile pendingFile;
- private CompletedFileStatus streamStatus;
- private SocketChannel socketChannel;
-
- public IncomingStreamReader(SocketChannel socketChannel)
- {
- this.socketChannel = socketChannel;
- InetSocketAddress remoteAddress =
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- pendingFile =
StreamInManager.getStreamContext(remoteAddress.getAddress());
- assert pendingFile != null;
- streamStatus =
StreamInManager.getStreamStatus(remoteAddress.getAddress());
- assert streamStatus != null;
- }
-
- public void read() throws IOException
- {
- InetSocketAddress remoteAddress =
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- if (logger.isDebugEnabled())
- logger.debug("Creating file for " + pendingFile.getTargetFile());
- FileOutputStream fos = new
FileOutputStream(pendingFile.getTargetFile(), true);
- FileChannel fc = fos.getChannel();
-
- long bytesRead = 0;
- try
- {
- while (bytesRead < pendingFile.getExpectedBytes())
- bytesRead += fc.transferFrom(socketChannel, bytesRead,
FileStreamTask.CHUNK_SIZE);
- }
- catch (IOException ex)
- {
- /* Ask the source node to re-stream this file. */
-
streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
- handleStreamCompletion(remoteAddress.getAddress());
- /* Delete the orphaned file. */
- File file = new File(pendingFile.getTargetFile());
- file.delete();
- throw ex;
- }
-
- if (bytesRead == pendingFile.getExpectedBytes())
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Removing stream context " + pendingFile);
- }
- fc.close();
- handleStreamCompletion(remoteAddress.getAddress());
- }
- }
-
- private void handleStreamCompletion(InetAddress remoteHost) throws
IOException
- {
- /*
- * Streaming is complete. If all the data that has to be received
inform the sender via
- * the stream completion callback so that the source may perform the
requisite cleanup.
- */
- IStreamComplete streamComplete =
StreamInManager.getStreamCompletionHandler(remoteHost);
- if (streamComplete != null)
- streamComplete.onStreamCompletion(remoteHost, pendingFile,
streamStatus);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.FileStreamTask;
+
+public class IncomingStreamReader
+{
+ private static Logger logger =
Logger.getLogger(IncomingStreamReader.class);
+ private PendingFile pendingFile;
+ private CompletedFileStatus streamStatus;
+ private SocketChannel socketChannel;
+
+ public IncomingStreamReader(SocketChannel socketChannel)
+ {
+ this.socketChannel = socketChannel;
+ InetSocketAddress remoteAddress =
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ pendingFile =
StreamInManager.getStreamContext(remoteAddress.getAddress());
+ StreamInManager.activeStreams.put(remoteAddress.getAddress(),
pendingFile);
+ assert pendingFile != null;
+ streamStatus =
StreamInManager.getStreamStatus(remoteAddress.getAddress());
+ assert streamStatus != null;
+ }
+
+ public void read() throws IOException
+ {
+ InetSocketAddress remoteAddress =
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ if (logger.isDebugEnabled())
+ logger.debug("Creating file for " + pendingFile.getTargetFile());
+ FileOutputStream fos = new
FileOutputStream(pendingFile.getTargetFile(), true);
+ FileChannel fc = fos.getChannel();
+
+ long bytesRead = 0;
+ try
+ {
+ while (bytesRead < pendingFile.getExpectedBytes()) {
+ bytesRead += fc.transferFrom(socketChannel, bytesRead,
FileStreamTask.CHUNK_SIZE);
+ pendingFile.update(bytesRead);
+ }
+ }
+ catch (IOException ex)
+ {
+ /* Ask the source node to re-stream this file. */
+
streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
+ handleStreamCompletion(remoteAddress.getAddress());
+ /* Delete the orphaned file. */
+ File file = new File(pendingFile.getTargetFile());
+ file.delete();
+ throw ex;
+ }
+ finally
+ {
+ StreamInManager.activeStreams.remove(remoteAddress.getAddress(),
pendingFile);
+ }
+
+ if (bytesRead == pendingFile.getExpectedBytes())
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Removing stream context " + pendingFile);
+ }
+ fc.close();
+ handleStreamCompletion(remoteAddress.getAddress());
+ }
+ }
+
+ private void handleStreamCompletion(InetAddress remoteHost) throws
IOException
+ {
+ /*
+ * Streaming is complete. If all the data that has to be received
inform the sender via
+ * the stream completion callback so that the source may perform the
requisite cleanup.
+ */
+ IStreamComplete streamComplete =
StreamInManager.getStreamCompletionHandler(remoteHost);
+ if (streamComplete != null)
+ streamComplete.onStreamCompletion(remoteHost, pendingFile,
streamStatus);
+ }
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
Wed Feb 10 22:16:14 2010
@@ -1,90 +1,102 @@
-package org.apache.cassandra.streaming;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-
-class PendingFile
-{
- private static ICompactSerializer<PendingFile> serializer_;
-
- static
- {
- serializer_ = new InitiatedFileSerializer();
- }
-
- public static ICompactSerializer<PendingFile> serializer()
- {
- return serializer_;
- }
-
- private String targetFile_;
- private long expectedBytes_;
- private String table_;
-
- public PendingFile(String targetFile, long expectedBytes, String table)
- {
- targetFile_ = targetFile;
- expectedBytes_ = expectedBytes;
- table_ = table;
- }
-
- public String getTable()
- {
- return table_;
- }
-
- public String getTargetFile()
- {
- return targetFile_;
- }
-
- public void setTargetFile(String file)
- {
- targetFile_ = file;
- }
-
- public long getExpectedBytes()
- {
- return expectedBytes_;
- }
-
- public boolean equals(Object o)
- {
- if ( !(o instanceof PendingFile) )
- return false;
-
- PendingFile rhs = (PendingFile)o;
- return targetFile_.equals(rhs.targetFile_);
- }
-
- public int hashCode()
- {
- return toString().hashCode();
- }
-
- public String toString()
- {
- return targetFile_ + ":" + expectedBytes_;
- }
-
- private static class InitiatedFileSerializer implements
ICompactSerializer<PendingFile>
- {
- public void serialize(PendingFile sc, DataOutputStream dos) throws
IOException
- {
- dos.writeUTF(sc.targetFile_);
- dos.writeLong(sc.expectedBytes_);
- dos.writeUTF(sc.table_);
- }
-
- public PendingFile deserialize(DataInputStream dis) throws IOException
- {
- String targetFile = dis.readUTF();
- long expectedBytes = dis.readLong();
- String table = dis.readUTF();
- return new PendingFile(targetFile, expectedBytes, table);
- }
- }
-}
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+class PendingFile
+{
+ private static ICompactSerializer<PendingFile> serializer_;
+
+ static
+ {
+ serializer_ = new InitiatedFileSerializer();
+ }
+
+ public static ICompactSerializer<PendingFile> serializer()
+ {
+ return serializer_;
+ }
+
+ private String targetFile_;
+ private final long expectedBytes_;
+ private final String table_;
+ private long ptr_;
+
+ public PendingFile(String targetFile, long expectedBytes, String table)
+ {
+ targetFile_ = targetFile;
+ expectedBytes_ = expectedBytes;
+ table_ = table;
+ ptr_ = 0;
+ }
+
+ public void update(long ptr)
+ {
+ ptr_ = ptr;
+ }
+
+ public long getPtr()
+ {
+ return ptr_;
+ }
+
+ public String getTable()
+ {
+ return table_;
+ }
+
+ public String getTargetFile()
+ {
+ return targetFile_;
+ }
+
+ public void setTargetFile(String file)
+ {
+ targetFile_ = file;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof PendingFile) )
+ return false;
+
+ PendingFile rhs = (PendingFile)o;
+ return targetFile_.hashCode() == rhs.hashCode();
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return targetFile_ + ":" + expectedBytes_;
+ }
+
+ private static class InitiatedFileSerializer implements
ICompactSerializer<PendingFile>
+ {
+ public void serialize(PendingFile sc, DataOutputStream dos) throws
IOException
+ {
+ dos.writeUTF(sc.targetFile_);
+ dos.writeLong(sc.expectedBytes_);
+ dos.writeUTF(sc.table_);
+ }
+
+ public PendingFile deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ String table = dis.readUTF();
+ return new PendingFile(targetFile, expectedBytes, table);
+ }
+ }
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
Wed Feb 10 22:16:14 2010
@@ -21,6 +21,9 @@
import java.util.*;
import java.net.InetAddress;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.apache.cassandra.streaming.IStreamComplete;
import org.apache.log4j.Logger;
@@ -35,7 +38,9 @@
public static final Map<InetAddress, List<CompletedFileStatus>>
streamStatusBag_ = new Hashtable<InetAddress, List<CompletedFileStatus>>();
/* Maintains a callback handler per endpoint to notify the app that a
stream from a given endpoint has been handled */
public static final Map<InetAddress, IStreamComplete>
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
-
+
+ public static final Multimap<InetAddress, PendingFile> activeStreams =
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
+
public synchronized static PendingFile getStreamContext(InetAddress key)
{
List<PendingFile> context = ctxBag_.get(key);
@@ -57,7 +62,27 @@
streamStatusBag_.remove(key);
return streamStatus;
}
-
+
+ /** query method to determine which hosts are streaming to this node. */
+ public static Set<InetAddress> getSources()
+ {
+ HashSet<InetAddress> set = new HashSet<InetAddress>();
+ set.addAll(ctxBag_.keySet());
+ set.addAll(activeStreams.keySet());
+ return set;
+ }
+
+ /** query the status of incoming files. */
+ public static List<PendingFile> getIncomingFiles(InetAddress host)
+ {
+ // avoid returning null.
+ List<PendingFile> list = new ArrayList<PendingFile>();
+ if (ctxBag_.containsKey(host))
+ list.addAll(ctxBag_.get(host));
+ list.addAll(activeStreams.get(host));
+ return list;
+ }
+
/*
* This method helps determine if the StreamCompletionHandler needs
* to be invoked for the data being streamed from a source.
@@ -101,5 +126,5 @@
streamStatusBag_.put(key, status);
}
status.add( streamStatus );
- }
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
Wed Feb 10 22:16:14 2010
@@ -139,6 +139,7 @@
{
logger.info("Waiting for transfer to " + target + " to complete");
StreamOutManager.get(target).waitForStreamCompletion();
+ // todo: it would be good if there were a dafe way to remove the
StreamManager for target.
// (StreamManager will delete the streamed file on completion.)
logger.info("Done with transfer to " + target);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Wed Feb 10 22:16:14 2010
@@ -21,7 +21,12 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,7 +42,7 @@
/**
* This class manages the streaming of multiple files one after the other.
*/
-class StreamOutManager
+public class StreamOutManager
{
private static Logger logger = Logger.getLogger( StreamOutManager.class );
@@ -54,8 +59,17 @@
}
return manager;
}
+
+ public static Set<InetAddress> getDestinations()
+ {
+ // the results of streamManagers.keySet() isn't serializable, so
create a new set.
+ return new HashSet(streamManagers.keySet());
+ }
+
+ // we need sequential and random access to the files. hence, the map and
the list.
+ private final List<PendingFile> files = new ArrayList<PendingFile>();
+ private final Map<String, PendingFile> fileMap = new HashMap<String,
PendingFile>();
- private final List<File> files = new ArrayList<File>();
private final InetAddress to;
private long totalBytes = 0L;
private final SimpleCondition condition = new SimpleCondition();
@@ -71,16 +85,24 @@
{
if (logger.isDebugEnabled())
logger.debug("Adding file " + pendingFile.getTargetFile() + " to
be streamed.");
- files.add( new File( pendingFile.getTargetFile() ) );
+ files.add(pendingFile);
+ fileMap.put(pendingFile.getTargetFile(), pendingFile);
totalBytes += pendingFile.getExpectedBytes();
}
}
+
+ public void update(String path, long pos)
+ {
+ PendingFile pf = fileMap.get(path);
+ if (pf != null)
+ pf.update(pos);
+ }
public void startNext()
{
if (files.size() > 0)
{
- File file = files.get(0);
+ File file = new File(files.get(0).getTargetFile());
if (logger.isDebugEnabled())
logger.debug("Streaming " + file.length() + " length file " +
file + " ...");
MessagingService.instance.stream(file.getAbsolutePath(), 0L,
file.length(), FBUtilities.getLocalAddress(), to);
@@ -93,7 +115,9 @@
if (logger.isDebugEnabled())
logger.debug("Deleting file " + file + " after streaming " +
f.length() + "/" + totalBytes + " bytes.");
FileUtils.delete(file);
- files.remove(0);
+ PendingFile pf = files.remove(0);
+ if (pf != null)
+ fileMap.remove(pf.getTargetFile());
if (files.size() > 0)
{
startNext();
@@ -117,4 +141,29 @@
throw new AssertionError(e);
}
}
+
+ List<PendingFile> getFiles()
+ {
+ return Collections.unmodifiableList(files);
+ }
+
+ public class StreamFile extends File
+ {
+ private long ptr = 0;
+ public StreamFile(String path)
+ {
+ super(path);
+ ptr = 0;
+ }
+
+ private void update(long ptr)
+ {
+ this.ptr = ptr;
+ }
+
+ public long getPtr()
+ {
+ return ptr;
+ }
+ }
}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=908682&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Wed Feb 10 22:16:14 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import org.apache.cassandra.streaming.StreamingServiceMBean;
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class StreamingService implements StreamingServiceMBean
+{
+ private static final Logger logger =
Logger.getLogger(StreamingService.class);
+ public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.streaming:type=StreamingService";
+ public static final StreamingService instance = new StreamingService();
+
+ private StreamingService()
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_OBJECT_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** hosts receiving outgoing streams. */
+ public Set<InetAddress> getStreamDestinations()
+ {
+ return StreamOutManager.getDestinations();
+ }
+
+ /** outgoing streams */
+ public List<String> getOutgoingFiles(String host) throws IOException
+ {
+ List<String> files = new ArrayList<String>();
+ // first, verify that host is a destination. calling
StreamOutManager.get will put it in the collection
+ // leading to false positives in the future.
+ Set<InetAddress> existingDestinations = getStreamDestinations();
+ InetAddress dest = InetAddress.getByName(host);
+ if (!existingDestinations.contains(dest))
+ return files;
+
+ StreamOutManager manager = StreamOutManager.get(dest);
+ for (PendingFile f : manager.getFiles())
+ files.add(String.format("%s %d/%d", f.getTargetFile(), f.getPtr(),
f.getExpectedBytes()));
+ return files;
+ }
+
+ /** hosts sending incoming streams */
+ public Set<InetAddress> getStreamSources()
+ {
+ return StreamInManager.getSources();
+ }
+
+ /** details about incoming streams. */
+ public List<String> getIncomingFiles(String host) throws IOException
+ {
+ List<String> files = new ArrayList<String>();
+ for (PendingFile pf :
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
+ {
+ files.add(String.format("%s: %s %d/%d", pf.getTable(),
pf.getTargetFile(), pf.getPtr(), pf.getExpectedBytes()));
+ }
+ return files;
+ }
+}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java?rev=908682&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
Wed Feb 10 22:16:14 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Set;
+
+public interface StreamingServiceMBean
+{
+ /** hosts recieving outgoing streams */
+ public Set<InetAddress> getStreamDestinations();
+
+ /** outgoing streams */
+ public List<String> getOutgoingFiles(String host) throws IOException;
+
+ /** hosts sending incoming streams. */
+ public Set<InetAddress> getStreamSources();
+
+ /** details about incoming streams */
+ public List<String> getIncomingFiles(String host) throws IOException;
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
Wed Feb 10 22:16:14 2010
@@ -3,9 +3,11 @@
import java.io.IOException;
import java.io.PrintStream;
import java.lang.management.MemoryUsage;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -20,6 +22,7 @@
import org.apache.cassandra.dht.Range;
import org.apache.commons.cli.*;
+import org.apache.commons.lang.StringUtils;
public class NodeCmd {
private static final String HOST_OPT_LONG = "host";
@@ -55,7 +58,8 @@
"%nAvailable commands: ring, info, cleanup, compact, cfstats,
snapshot [snapshotname], clearsnapshot, " +
"tpstats, flush, repair, decommission, move, loadbalance,
removetoken, " +
"setcachecapacity <keyspace> <cfname> <keycachecapacity>
<rowcachecapacity>, " +
- "getcompactionthreshold, setcompactionthreshold [minthreshold]
([maxthreshold])");
+ "getcompactionthreshold, setcompactionthreshold [minthreshold]
([maxthreshold])" +
+ "streams [host]");
String usage = String.format("java %s --host <arg> <command>%n",
NodeCmd.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -173,6 +177,60 @@
double memMax = (double)heapUsage.getMax() / (1024 * 1024);
outs.println(String.format("%-17s: %.2f / %.2f", "Heap Memory (MB)",
memUsed, memMax));
}
+
+ public void printStreamInfo(final InetAddress addr, PrintStream outs)
+ {
+ outs.println(String.format("Mode: %s", probe.getOperationMode()));
+ Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations()
: new HashSet<InetAddress>(){{add(addr);}};
+ if (hosts.size() == 0)
+ outs.println("Not sending any streams.");
+ for (InetAddress host : hosts)
+ {
+ try
+ {
+ List<String> files = probe.getFilesDestinedFor(host);
+ if (files.size() > 0)
+ {
+ outs.println(String.format("Streaming to: %s", host));
+ for (String file : files)
+ outs.println(String.format(" %s", file));
+ }
+ else
+ {
+ outs.println(String.format(" Nothing streaming to %s",
host));
+ }
+ }
+ catch (IOException ex)
+ {
+ outs.println(String.format(" Error retrieving file data for
%s", host));
+ }
+ }
+
+ hosts = addr == null ? probe.getStreamSources() : new
HashSet<InetAddress>(){{add(addr); }};
+ if (hosts.size() == 0)
+ outs.println("Not receiving any streams.");
+ for (InetAddress host : hosts)
+ {
+ try
+ {
+ List<String> files = probe.getIncomingFiles(host);
+ if (files.size() > 0)
+ {
+ outs.println(String.format("Streaming from: %s", host));
+ for (String file : files)
+ outs.println(String.format(" %s", file));
+ }
+ else
+ {
+ outs.println(String.format(" Nothing streaming from %s",
host));
+ }
+ }
+ catch (IOException ex)
+ {
+ outs.println(String.format(" Error retrieving file data for
%s", host));
+ }
+ }
+ }
public void printColumnFamilyStats(PrintStream outs)
{
@@ -466,6 +524,11 @@
}
probe.setCompactionThreshold(minthreshold, maxthreshold);
}
+ else if (cmdName.equals("streams"))
+ {
+ String otherHost = arguments.length > 1 ? arguments[1] : null;
+ nodeCmd.printStreamInfo(otherHost == null ? null :
InetAddress.getByName(otherHost), System.out);
+ }
else
{
System.err.println("Unrecognized command: " + cmdName + ".");
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Wed Feb 10 22:16:14 2010
@@ -47,6 +47,8 @@
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.streaming.StreamingService;
+import org.apache.cassandra.streaming.StreamingServiceMBean;
/**
* JMX client operations for Cassandra.
@@ -64,6 +66,7 @@
private MemoryMXBean memProxy;
private RuntimeMXBean runtimeProxy;
private CompactionManagerMBean mcmProxy;
+ private StreamingServiceMBean streamProxy;
/**
* Creates a NodeProbe using the specified JMX host and port.
@@ -109,6 +112,8 @@
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name,
StorageServiceMBean.class);
name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
mcmProxy = JMX.newMBeanProxy(mbeanServerConn, name,
CompactionManagerMBean.class);
+ name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
+ streamProxy = JMX.newMBeanProxy(mbeanServerConn, name,
StreamingServiceMBean.class);
} catch (MalformedObjectNameException e)
{
throw new RuntimeException(
@@ -400,6 +405,31 @@
{
return ssProxy.getNaturalEndpoints(key, table);
}
+
+ public Set<InetAddress> getStreamDestinations()
+ {
+ return streamProxy.getStreamDestinations();
+ }
+
+ public List<String> getFilesDestinedFor(InetAddress host) throws
IOException
+ {
+ return streamProxy.getOutgoingFiles(host.getHostAddress());
+ }
+
+ public Set<InetAddress> getStreamSources()
+ {
+ return streamProxy.getStreamSources();
+ }
+
+ public List<String> getIncomingFiles(InetAddress host) throws IOException
+ {
+ return streamProxy.getIncomingFiles(host.getHostAddress());
+ }
+
+ public String getOperationMode()
+ {
+ return ssProxy.getOperationMode();
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String,
ColumnFamilyStoreMBean>>