Author: gdusbabek
Date: Wed Feb 10 22:16:14 2010
New Revision: 908682

URL: http://svn.apache.org/viewvc?rev=908682&view=rev
Log:
jmx interface for tracking streams (incoming and outgoing) and a general 
operation-mode.  Patch by Gary Dusbabek, reviewed by Jonathan Ellis.  
CASSANDRA-709

Added:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
Modified:
    incubator/cassandra/trunk/CHANGES.txt
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Feb 10 22:16:14 2010
@@ -23,6 +23,8 @@
  * track latency in microseconds (CASSANDRA-733)
  * add describe_ Thrift methods, deprecating get_string_property and 
    get_string_list_property
+ * jmx interface for tracking operation mode and streams in general.
+   (CASSANDRA-709)
 
 
 0.5.1

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
Wed Feb 10 22:16:14 2010
@@ -25,6 +25,7 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.cassandra.streaming.StreamOutManager;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.log4j.Logger;
 
@@ -95,6 +96,7 @@
                 if (logger.isDebugEnabled())
                     logger.debug("Bytes transferred " + bytesTransferred);
                 start += bytesTransferred;
+                StreamOutManager.get(to).update(file, start);
             }
         }
         finally

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Wed Feb 10 22:16:14 2010
@@ -145,6 +145,7 @@
     /* when intialized as a client, we shouldn't write to the system table. */
     private boolean isClientMode;
     private boolean initialized;
+    private String operationMode;
 
     public void addBootstrapSource(InetAddress s, String table)
     {
@@ -175,6 +176,7 @@
         setToken(getLocalToken());
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new 
ApplicationState(STATE_NORMAL + Delimiter + 
partitioner_.getTokenFactory().toString(getLocalToken())));
         logger_.info("Bootstrap/move completed! Now serving reads.");
+        setMode("Normal", false);
     }
 
     /** This method updates the local token on disk  */
@@ -228,6 +230,10 @@
             replicationStrategies.put(table, strat);
         }
         replicationStrategies = 
Collections.unmodifiableMap(replicationStrategies);
+
+        // spin up the streaming serivice so it is available for jmx tools.
+        if (StreamingService.instance == null)
+            throw new RuntimeException("Streaming service is unavailable.");
     }
 
     public AbstractReplicationStrategy getReplicationStrategy(String table)
@@ -279,6 +285,7 @@
         MessagingService.instance.listen(FBUtilities.getLocalAddress());
         Gossiper.instance.register(this);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), 
(int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
+        setMode("Client", false);
     }
 
     public synchronized void initServer() throws IOException
@@ -313,14 +320,16 @@
         if (DatabaseDescriptor.isAutoBootstrap()
             && 
!(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || 
SystemTable.isBootstrapped()))
         {
-            logger_.info("Starting in bootstrap mode");
+            setMode("Joining: getting load information", true);
             StorageLoadBalancer.instance.waitForLoadInfo();
-            logger_.info("... got load info");
+            if (logger_.isDebugEnabled())
+                logger_.debug("... got load info");
             if (tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
             {
                 String s = "This node is already a member of the token ring; 
bootstrap aborted. (If replacing a dead node, remove the old one from the ring 
first.)";
                 throw new UnsupportedOperationException(s);
             }
+            setMode("Joining: getting bootstrap token", true);
             Token token = BootStrapper.getBootstrapToken(tokenMetadata_, 
StorageLoadBalancer.instance.getLoadInfo());
             startBootstrap(token);
             // don't finish startup (enabling thrift) until after bootstrap is 
done
@@ -342,17 +351,25 @@
             Token token = storageMetadata_.getToken();
             tokenMetadata_.updateNormalToken(token, 
FBUtilities.getLocalAddress());
             Gossiper.instance.addLocalApplicationState(MOVE_STATE, new 
ApplicationState(STATE_NORMAL + Delimiter + 
partitioner_.getTokenFactory().toString(token)));
+            setMode("Normal", false);
         }
 
         assert tokenMetadata_.sortedTokens().size() > 0;
     }
 
+    private void setMode(String m, boolean log)
+    {
+        operationMode = m;
+        if (log)
+            logger_.info(m);
+    }
+
     private void startBootstrap(Token token) throws IOException
     {
         isBootstrapMode = true;
         SystemTable.updateToken(token); // DON'T use setToken, that makes us 
part of the ring locally which is incorrect until we are done bootstrapping
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new 
ApplicationState(STATE_BOOTSTRAPPING + Delimiter + 
partitioner_.getTokenFactory().toString(token)));
-        logger_.info("bootstrap sleeping " + RING_DELAY);
+        setMode("Joining: sleeping " + RING_DELAY + " for pending range 
setup", true);
         try
         {
             Thread.sleep(RING_DELAY);
@@ -361,6 +378,7 @@
         {
             throw new AssertionError(e);
         }
+        setMode("Bootstrapping", true);
         new BootStrapper(FBUtilities.getLocalAddress(), token, 
tokenMetadata_).startBootstrap(); // handles token update
     }
 
@@ -1271,10 +1289,10 @@
                 throw new UnsupportedOperationException("data is currently 
moving to this node; unable to leave the ring");
         }
 
-        // leave the ring
-        logger_.info("DECOMMISSIONING");
+        if (logger_.isDebugEnabled())
+            logger_.debug("DECOMMISSIONING");
         startLeaving();
-        logger_.info("decommission sleeping " + RING_DELAY);
+        setMode("Leaving: sleeping " + RING_DELAY + " for pending range 
setup", true);
         Thread.sleep(RING_DELAY);
 
         Runnable finishLeaving = new Runnable()
@@ -1283,7 +1301,7 @@
             {
                 Gossiper.instance.stop();
                 MessagingService.shutdown();
-                logger_.info("DECOMMISSION FINISHED.");
+                setMode("Decommissioned", true);
                 // let op be responsible for killing the process
             }
         };
@@ -1296,8 +1314,6 @@
         tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
         calculatePendingRanges();
 
-        if (logger_.isDebugEnabled())
-            logger_.debug("");
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new 
ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter + 
getLocalToken().toString()));
         try
         {
@@ -1323,6 +1339,7 @@
                 continue;
             }
 
+            setMode("Leaving: streaming data to other nodes", true);
             final Set<Map.Entry<Range, InetAddress>> pending = 
Collections.synchronizedSet(new HashSet<Map.Entry<Range, 
InetAddress>>(rangesMM.entries()));
             for (final Map.Entry<Range, InetAddress> entry : 
rangesMM.entries())
             {
@@ -1388,10 +1405,10 @@
         if (token != null && tokenMetadata_.sortedTokens().contains(token))
             throw new IOException("target token " + token + " is already owned 
by another node");
 
-        // leave the ring
-        logger_.info("starting move. leaving token " + getLocalToken());
+        if (logger_.isDebugEnabled())
+            logger_.debug("Leaving: old token was " + getLocalToken());
         startLeaving();
-        logger_.info("move sleeping " + RING_DELAY);
+         setMode("Leaving: sleeping " + RING_DELAY + " for pending range 
setup", true);
         Thread.sleep(RING_DELAY);
 
         Runnable finishMoving = new WrappedRunnable()
@@ -1481,6 +1498,11 @@
         return false;
     }
 
+    public String getOperationMode()
+    {
+        return operationMode;
+    }
+
     // Never ever do this at home. Used by tests.
     Map<String, AbstractReplicationStrategy> 
setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy> 
replacement)
     {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Wed Feb 10 22:16:14 2010
@@ -157,4 +157,7 @@
 
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);
+
+    /** get the operational mode (leaving, joining, normal, decommissioned, 
client) **/
+    public String getOperationMode();
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Wed Feb 10 22:16:14 2010
@@ -1,94 +1,101 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.net.InetSocketAddress;
-import java.net.InetAddress;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.io.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.net.FileStreamTask;
-
-public class IncomingStreamReader
-{
-    private static Logger logger = 
Logger.getLogger(IncomingStreamReader.class);
-    private PendingFile pendingFile;
-    private CompletedFileStatus streamStatus;
-    private SocketChannel socketChannel;
-
-    public IncomingStreamReader(SocketChannel socketChannel)
-    {
-        this.socketChannel = socketChannel;
-        InetSocketAddress remoteAddress = 
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        pendingFile = 
StreamInManager.getStreamContext(remoteAddress.getAddress());
-        assert pendingFile != null;
-        streamStatus = 
StreamInManager.getStreamStatus(remoteAddress.getAddress());
-        assert streamStatus != null;
-    }
-
-    public void read() throws IOException
-    {
-        InetSocketAddress remoteAddress = 
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        if (logger.isDebugEnabled())
-          logger.debug("Creating file for " + pendingFile.getTargetFile());
-        FileOutputStream fos = new 
FileOutputStream(pendingFile.getTargetFile(), true);
-        FileChannel fc = fos.getChannel();
-
-        long bytesRead = 0;
-        try
-        {
-            while (bytesRead < pendingFile.getExpectedBytes())
-                bytesRead += fc.transferFrom(socketChannel, bytesRead, 
FileStreamTask.CHUNK_SIZE);
-        }
-        catch (IOException ex)
-        {
-            /* Ask the source node to re-stream this file. */
-            
streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
-            handleStreamCompletion(remoteAddress.getAddress());
-            /* Delete the orphaned file. */
-            File file = new File(pendingFile.getTargetFile());
-            file.delete();
-            throw ex;
-        }
-
-        if (bytesRead == pendingFile.getExpectedBytes())
-        {
-            if (logger.isDebugEnabled())
-            {
-                logger.debug("Removing stream context " + pendingFile);
-            }
-            fc.close();
-            handleStreamCompletion(remoteAddress.getAddress());
-        }
-    }
-
-    private void handleStreamCompletion(InetAddress remoteHost) throws 
IOException
-    {
-        /*
-         * Streaming is complete. If all the data that has to be received 
inform the sender via
-         * the stream completion callback so that the source may perform the 
requisite cleanup.
-        */
-        IStreamComplete streamComplete = 
StreamInManager.getStreamCompletionHandler(remoteHost);
-        if (streamComplete != null)
-            streamComplete.onStreamCompletion(remoteHost, pendingFile, 
streamStatus);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.InetSocketAddress;
+import java.net.InetAddress;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.net.FileStreamTask;
+
+public class IncomingStreamReader
+{
+    private static Logger logger = 
Logger.getLogger(IncomingStreamReader.class);
+    private PendingFile pendingFile;
+    private CompletedFileStatus streamStatus;
+    private SocketChannel socketChannel;
+
+    public IncomingStreamReader(SocketChannel socketChannel)
+    {
+        this.socketChannel = socketChannel;
+        InetSocketAddress remoteAddress = 
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        pendingFile = 
StreamInManager.getStreamContext(remoteAddress.getAddress());
+        StreamInManager.activeStreams.put(remoteAddress.getAddress(), 
pendingFile);
+        assert pendingFile != null;
+        streamStatus = 
StreamInManager.getStreamStatus(remoteAddress.getAddress());
+        assert streamStatus != null;
+    }
+
+    public void read() throws IOException
+    {
+        InetSocketAddress remoteAddress = 
(InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        if (logger.isDebugEnabled())
+          logger.debug("Creating file for " + pendingFile.getTargetFile());
+        FileOutputStream fos = new 
FileOutputStream(pendingFile.getTargetFile(), true);
+        FileChannel fc = fos.getChannel();
+
+        long bytesRead = 0;
+        try
+        {
+            while (bytesRead < pendingFile.getExpectedBytes()) {
+                bytesRead += fc.transferFrom(socketChannel, bytesRead, 
FileStreamTask.CHUNK_SIZE);
+                pendingFile.update(bytesRead);
+            }
+        }
+        catch (IOException ex)
+        {
+            /* Ask the source node to re-stream this file. */
+            
streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
+            handleStreamCompletion(remoteAddress.getAddress());
+            /* Delete the orphaned file. */
+            File file = new File(pendingFile.getTargetFile());
+            file.delete();
+            throw ex;
+        }
+        finally
+        {
+            StreamInManager.activeStreams.remove(remoteAddress.getAddress(), 
pendingFile);
+        }
+
+        if (bytesRead == pendingFile.getExpectedBytes())
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Removing stream context " + pendingFile);
+            }
+            fc.close();
+            handleStreamCompletion(remoteAddress.getAddress());
+        }
+    }
+
+    private void handleStreamCompletion(InetAddress remoteHost) throws 
IOException
+    {
+        /*
+         * Streaming is complete. If all the data that has to be received 
inform the sender via
+         * the stream completion callback so that the source may perform the 
requisite cleanup.
+        */
+        IStreamComplete streamComplete = 
StreamInManager.getStreamCompletionHandler(remoteHost);
+        if (streamComplete != null)
+            streamComplete.onStreamCompletion(remoteHost, pendingFile, 
streamStatus);
+    }
+}

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
 Wed Feb 10 22:16:14 2010
@@ -1,90 +1,102 @@
-package org.apache.cassandra.streaming;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-
-class PendingFile
-{
-    private static ICompactSerializer<PendingFile> serializer_;
-
-    static
-    {
-        serializer_ = new InitiatedFileSerializer();
-    }
-
-    public static ICompactSerializer<PendingFile> serializer()
-    {
-        return serializer_;
-    }
-
-    private String targetFile_;
-    private long expectedBytes_;
-    private String table_;
-
-    public PendingFile(String targetFile, long expectedBytes, String table)
-    {
-        targetFile_ = targetFile;
-        expectedBytes_ = expectedBytes;
-        table_ = table;
-    }
-
-    public String getTable()
-    {
-        return table_;
-    }
-
-    public String getTargetFile()
-    {
-        return targetFile_;
-    }
-
-    public void setTargetFile(String file)
-    {
-        targetFile_ = file;
-    }
-
-    public long getExpectedBytes()
-    {
-        return expectedBytes_;
-    }
-
-    public boolean equals(Object o)
-    {
-        if ( !(o instanceof PendingFile) )
-            return false;
-
-        PendingFile rhs = (PendingFile)o;
-        return targetFile_.equals(rhs.targetFile_);
-    }
-
-    public int hashCode()
-    {
-        return toString().hashCode();
-    }
-
-    public String toString()
-    {
-        return targetFile_ + ":" + expectedBytes_;
-    }
-
-    private static class InitiatedFileSerializer implements 
ICompactSerializer<PendingFile>
-    {
-        public void serialize(PendingFile sc, DataOutputStream dos) throws 
IOException
-        {
-            dos.writeUTF(sc.targetFile_);
-            dos.writeLong(sc.expectedBytes_);
-            dos.writeUTF(sc.table_);
-        }
-
-        public PendingFile deserialize(DataInputStream dis) throws IOException
-        {
-            String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            String table = dis.readUTF();
-            return new PendingFile(targetFile, expectedBytes, table);
-        }
-    }
-}
+package org.apache.cassandra.streaming;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+class PendingFile
+{
+    private static ICompactSerializer<PendingFile> serializer_;
+
+    static
+    {
+        serializer_ = new InitiatedFileSerializer();
+    }
+
+    public static ICompactSerializer<PendingFile> serializer()
+    {
+        return serializer_;
+    }
+
+    private String targetFile_;
+    private final long expectedBytes_;
+    private final String table_;
+    private long ptr_;
+
+    public PendingFile(String targetFile, long expectedBytes, String table)
+    {
+        targetFile_ = targetFile;
+        expectedBytes_ = expectedBytes;
+        table_ = table;
+        ptr_ = 0;
+    }
+
+    public void update(long ptr)
+    {
+        ptr_ = ptr;
+    }
+
+    public long getPtr()
+    {
+        return ptr_;
+    }
+
+    public String getTable()
+    {
+        return table_;
+    }
+
+    public String getTargetFile()
+    {
+        return targetFile_;
+    }
+
+    public void setTargetFile(String file)
+    {
+        targetFile_ = file;
+    }
+
+    public long getExpectedBytes()
+    {
+        return expectedBytes_;
+    }
+
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof PendingFile) )
+            return false;
+
+        PendingFile rhs = (PendingFile)o;
+        return targetFile_.hashCode() == rhs.hashCode();
+    }
+
+    public int hashCode()
+    {
+        return toString().hashCode();
+    }
+
+    public String toString()
+    {
+        return targetFile_ + ":" + expectedBytes_;
+    }
+
+    private static class InitiatedFileSerializer implements 
ICompactSerializer<PendingFile>
+    {
+        public void serialize(PendingFile sc, DataOutputStream dos) throws 
IOException
+        {
+            dos.writeUTF(sc.targetFile_);
+            dos.writeLong(sc.expectedBytes_);
+            dos.writeUTF(sc.table_);
+        }
+
+        public PendingFile deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            String table = dis.readUTF();
+            return new PendingFile(targetFile, expectedBytes, table);
+        }
+    }
+}

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInManager.java
 Wed Feb 10 22:16:14 2010
@@ -21,6 +21,9 @@
 import java.util.*;
 import java.net.InetAddress;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import org.apache.cassandra.streaming.IStreamComplete;
 
 import org.apache.log4j.Logger;
@@ -35,7 +38,9 @@
     public static final Map<InetAddress, List<CompletedFileStatus>> 
streamStatusBag_ = new Hashtable<InetAddress, List<CompletedFileStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a 
stream from a given endpoint has been handled */
     public static final Map<InetAddress, IStreamComplete> 
streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
-    
+
+    public static final Multimap<InetAddress, PendingFile> activeStreams = 
Multimaps.synchronizedMultimap(HashMultimap.<InetAddress, PendingFile>create());
+
     public synchronized static PendingFile getStreamContext(InetAddress key)
     {        
         List<PendingFile> context = ctxBag_.get(key);
@@ -57,7 +62,27 @@
             streamStatusBag_.remove(key);
         return streamStatus;
     }
-    
+
+    /** query method to determine which hosts are streaming to this node. */
+    public static Set<InetAddress> getSources()
+    {
+        HashSet<InetAddress> set = new HashSet<InetAddress>();
+        set.addAll(ctxBag_.keySet());
+        set.addAll(activeStreams.keySet());
+        return set;
+    }
+
+    /** query the status of incoming files. */
+    public static List<PendingFile> getIncomingFiles(InetAddress host)
+    {
+        // avoid returning null.
+        List<PendingFile> list = new ArrayList<PendingFile>();
+        if (ctxBag_.containsKey(host))
+            list.addAll(ctxBag_.get(host));
+        list.addAll(activeStreams.get(host));
+        return list;
+    }
+
     /*
      * This method helps determine if the StreamCompletionHandler needs
      * to be invoked for the data being streamed from a source. 
@@ -101,5 +126,5 @@
             streamStatusBag_.put(key, status);
         }
         status.add( streamStatus );
-    }        
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
 Wed Feb 10 22:16:14 2010
@@ -139,6 +139,7 @@
         {
             logger.info("Waiting for transfer to " + target + " to complete");
             StreamOutManager.get(target).waitForStreamCompletion();
+            // todo: it would be good if there were a dafe way to remove the 
StreamManager for target.
             // (StreamManager will delete the streamed file on completion.)
             logger.info("Done with transfer to " + target);
         }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
 Wed Feb 10 22:16:14 2010
@@ -21,7 +21,12 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -37,7 +42,7 @@
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-class StreamOutManager
+public class StreamOutManager
 {   
     private static Logger logger = Logger.getLogger( StreamOutManager.class );
         
@@ -54,8 +59,17 @@
         }
         return manager;
     }
+
+    public static Set<InetAddress> getDestinations()
+    {
+        // the results of streamManagers.keySet() isn't serializable, so 
create a new set.
+        return new HashSet(streamManagers.keySet());
+    }
+
+    // we need sequential and random access to the files. hence, the map and 
the list.
+    private final List<PendingFile> files = new ArrayList<PendingFile>();
+    private final Map<String, PendingFile> fileMap = new HashMap<String, 
PendingFile>();
     
-    private final List<File> files = new ArrayList<File>();
     private final InetAddress to;
     private long totalBytes = 0L;
     private final SimpleCondition condition = new SimpleCondition();
@@ -71,16 +85,24 @@
         {
             if (logger.isDebugEnabled())
               logger.debug("Adding file " + pendingFile.getTargetFile() + " to 
be streamed.");
-            files.add( new File( pendingFile.getTargetFile() ) );
+            files.add(pendingFile);
+            fileMap.put(pendingFile.getTargetFile(), pendingFile);
             totalBytes += pendingFile.getExpectedBytes();
         }
     }
+
+    public void update(String path, long pos)
+    {
+        PendingFile pf = fileMap.get(path);
+        if (pf != null)
+            pf.update(pos);
+    }
     
     public void startNext()
     {
         if (files.size() > 0)
         {
-            File file = files.get(0);
+            File file = new File(files.get(0).getTargetFile());
             if (logger.isDebugEnabled())
               logger.debug("Streaming " + file.length() + " length file " + 
file + " ...");
             MessagingService.instance.stream(file.getAbsolutePath(), 0L, 
file.length(), FBUtilities.getLocalAddress(), to);
@@ -93,7 +115,9 @@
         if (logger.isDebugEnabled())
           logger.debug("Deleting file " + file + " after streaming " + 
f.length() + "/" + totalBytes + " bytes.");
         FileUtils.delete(file);
-        files.remove(0);
+        PendingFile pf = files.remove(0);
+        if (pf != null)
+            fileMap.remove(pf.getTargetFile());
         if (files.size() > 0)
         {
             startNext();
@@ -117,4 +141,29 @@
             throw new AssertionError(e);
         }
     }
+
+    List<PendingFile> getFiles()
+    {
+        return Collections.unmodifiableList(files);
+    }
+
+    public class StreamFile extends File
+    {
+        private long ptr = 0;
+        public StreamFile(String path)
+        {
+            super(path);
+            ptr = 0;
+        }
+
+        private void update(long ptr)
+        {
+            this.ptr = ptr;
+        }
+
+        public long getPtr()
+        {
+            return ptr;
+        }
+    }
 }

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=908682&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
 Wed Feb 10 22:16:14 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import org.apache.cassandra.streaming.StreamingServiceMBean;
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class StreamingService implements StreamingServiceMBean
+{
+    private static final Logger logger = 
Logger.getLogger(StreamingService.class);
+    public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.streaming:type=StreamingService";
+    public static final StreamingService instance = new StreamingService();
+
+    private StreamingService()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_OBJECT_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** hosts receiving outgoing streams. */
+    public Set<InetAddress> getStreamDestinations()
+    {
+        return StreamOutManager.getDestinations();
+    }
+
+    /** outgoing streams */
+    public List<String> getOutgoingFiles(String host) throws IOException
+    {
+        List<String> files = new ArrayList<String>();
+        // first, verify that host is a destination. calling 
StreamOutManager.get will put it in the collection
+        // leading to false positives in the future.
+        Set<InetAddress> existingDestinations = getStreamDestinations();
+        InetAddress dest = InetAddress.getByName(host);
+        if (!existingDestinations.contains(dest))
+            return files;
+        
+        StreamOutManager manager = StreamOutManager.get(dest);
+        for (PendingFile f : manager.getFiles())
+            files.add(String.format("%s %d/%d", f.getTargetFile(), f.getPtr(), 
f.getExpectedBytes()));
+        return files;
+    }
+
+    /** hosts sending incoming streams */
+    public Set<InetAddress> getStreamSources()
+    {
+        return StreamInManager.getSources();
+    }
+
+    /** details about incoming streams. */
+    public List<String> getIncomingFiles(String host) throws IOException
+    {
+        List<String> files = new ArrayList<String>();
+        for (PendingFile pf : 
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
+        {
+            files.add(String.format("%s: %s %d/%d", pf.getTable(), 
pf.getTargetFile(), pf.getPtr(), pf.getExpectedBytes()));
+        }
+        return files;
+    }
+}

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java?rev=908682&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
 Wed Feb 10 22:16:14 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Set;
+
+public interface StreamingServiceMBean
+{
+    /** hosts recieving outgoing streams */
+    public Set<InetAddress> getStreamDestinations();
+
+    /** outgoing streams */
+    public List<String> getOutgoingFiles(String host) throws IOException;
+
+    /** hosts sending incoming streams. */
+    public Set<InetAddress> getStreamSources();
+
+    /** details about incoming streams */
+    public List<String> getIncomingFiles(String host) throws IOException;
+}

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java 
Wed Feb 10 22:16:14 2010
@@ -3,9 +3,11 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.management.MemoryUsage;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -20,6 +22,7 @@
 import org.apache.cassandra.dht.Range;
 
 import org.apache.commons.cli.*;
+import org.apache.commons.lang.StringUtils;
 
 public class NodeCmd {
     private static final String HOST_OPT_LONG = "host";
@@ -55,7 +58,8 @@
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, 
snapshot [snapshotname], clearsnapshot, " +
                 "tpstats, flush, repair, decommission, move, loadbalance, 
removetoken, " +
                 "setcachecapacity <keyspace> <cfname> <keycachecapacity> 
<rowcachecapacity>, " +
-                "getcompactionthreshold, setcompactionthreshold [minthreshold] 
([maxthreshold])");
+                "getcompactionthreshold, setcompactionthreshold [minthreshold] 
([maxthreshold])" +
+                "streams [host]");
         String usage = String.format("java %s --host <arg> <command>%n", 
NodeCmd.class.getName());
         hf.printHelp(usage, "", options, header);
     }
@@ -173,6 +177,60 @@
         double memMax = (double)heapUsage.getMax() / (1024 * 1024);
         outs.println(String.format("%-17s: %.2f / %.2f", "Heap Memory (MB)", 
memUsed, memMax));
     }
+
+    public void printStreamInfo(final InetAddress addr, PrintStream outs)
+    {
+        outs.println(String.format("Mode: %s", probe.getOperationMode()));
+        Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations() 
: new HashSet<InetAddress>(){{add(addr);}};
+        if (hosts.size() == 0)
+            outs.println("Not sending any streams.");
+        for (InetAddress host : hosts)
+        {
+            try
+            {
+                List<String> files = probe.getFilesDestinedFor(host);
+                if (files.size() > 0)
+                {
+                    outs.println(String.format("Streaming to: %s", host));
+                    for (String file : files)
+                        outs.println(String.format("   %s", file));
+                }
+                else
+                {
+                    outs.println(String.format(" Nothing streaming to %s", 
host));
+                }
+            }
+            catch (IOException ex)
+            {
+                outs.println(String.format("   Error retrieving file data for 
%s", host));
+            }
+        }
+
+        hosts = addr == null ? probe.getStreamSources() : new 
HashSet<InetAddress>(){{add(addr); }};
+        if (hosts.size() == 0)
+            outs.println("Not receiving any streams.");
+        for (InetAddress host : hosts)
+        {
+            try
+            {
+                List<String> files = probe.getIncomingFiles(host);
+                if (files.size() > 0)
+                {
+                    outs.println(String.format("Streaming from: %s", host));
+                    for (String file : files)
+                        outs.println(String.format("   %s", file));
+                }
+                else
+                {
+                    outs.println(String.format(" Nothing streaming from %s", 
host));
+                }
+            }
+            catch (IOException ex)
+            {
+                outs.println(String.format("   Error retrieving file data for 
%s", host));
+            }
+        }
+    }
     
     public void printColumnFamilyStats(PrintStream outs)
     {
@@ -466,6 +524,11 @@
             }
             probe.setCompactionThreshold(minthreshold, maxthreshold);
         }
+        else if (cmdName.equals("streams"))
+        {
+            String otherHost = arguments.length > 1 ? arguments[1] : null;
+            nodeCmd.printStreamInfo(otherHost == null ? null : 
InetAddress.getByName(otherHost), System.out);
+        }
         else
         {
             System.err.println("Unrecognized command: " + cmdName + ".");

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=908682&r1=908681&r2=908682&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
Wed Feb 10 22:16:14 2010
@@ -47,6 +47,8 @@
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.streaming.StreamingService;
+import org.apache.cassandra.streaming.StreamingServiceMBean;
 
 /**
  * JMX client operations for Cassandra.
@@ -64,6 +66,7 @@
     private MemoryMXBean memProxy;
     private RuntimeMXBean runtimeProxy;
     private CompactionManagerMBean mcmProxy;
+    private StreamingServiceMBean streamProxy;
     
     /**
      * Creates a NodeProbe using the specified JMX host and port.
@@ -109,6 +112,8 @@
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
StorageServiceMBean.class);
             name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
             mcmProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
CompactionManagerMBean.class);
+            name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
+            streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
StreamingServiceMBean.class);
         } catch (MalformedObjectNameException e)
         {
             throw new RuntimeException(
@@ -400,6 +405,31 @@
     {
         return ssProxy.getNaturalEndpoints(key, table);
     }
+
+    public Set<InetAddress> getStreamDestinations()
+    {
+        return streamProxy.getStreamDestinations();
+    }
+
+    public List<String> getFilesDestinedFor(InetAddress host) throws 
IOException
+    {
+        return streamProxy.getOutgoingFiles(host.getHostAddress());
+    }
+
+    public Set<InetAddress> getStreamSources()
+    {
+        return streamProxy.getStreamSources();
+    }
+
+    public List<String> getIncomingFiles(InetAddress host) throws IOException
+    {
+        return streamProxy.getIncomingFiles(host.getHostAddress());
+    }
+
+    public String getOperationMode()
+    {
+        return ssProxy.getOperationMode();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>


Reply via email to