Author: gdusbabek
Date: Mon Sep 27 09:04:46 2010
New Revision: 1001629
URL: http://svn.apache.org/viewvc?rev=1001629&view=rev
Log:
add progress to streams. patch by Nick Bailey, reviewed by Gary Dusbabek.
CASSANDRA-1489
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon
Sep 27 09:04:46 2010
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.cassandra.net;
+package org.apache.cassandra.streaming;
import java.io.*;
import java.net.InetAddress;
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -38,13 +39,15 @@ public class FileStreamTask extends Wrap
{
private static Logger logger = LoggerFactory.getLogger(
FileStreamTask.class );
+ // 10MB chunks
+ public static final int CHUNK_SIZE = 10*1024*1024;
// around 10 minutes at the default rpctimeout
public static final int MAX_CONNECT_ATTEMPTS = 8;
private final StreamHeader header;
private final InetAddress to;
- FileStreamTask(StreamHeader header, InetAddress to)
+ public FileStreamTask(StreamHeader header, InetAddress to)
{
this.header = header;
this.to = to;
@@ -94,9 +97,14 @@ public class FileStreamTask extends Wrap
long length = section.right - section.left;
long bytesTransferred = 0;
while (bytesTransferred < length)
- bytesTransferred += fc.transferTo(section.left +
bytesTransferred, length - bytesTransferred, channel);
+ {
+ long toTransfer = Math.min(CHUNK_SIZE, length -
bytesTransferred);
+ long lastWrite = fc.transferTo(section.left +
bytesTransferred, toTransfer, channel);
+ bytesTransferred += lastWrite;
+ header.file.progress += lastWrite;
+ }
if (logger.isDebugEnabled())
- logger.debug("Bytes transferred " + bytesTransferred);
+ logger.debug("Bytes transferred " + bytesTransferred + "/"
+ header.file.size);
}
}
finally
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon
Sep 27 09:04:46 2010
@@ -46,6 +46,7 @@ import org.apache.cassandra.io.util.Data
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.GuidGenerator;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Mon Sep 27 09:04:46 2010
@@ -50,6 +50,8 @@ public class IncomingStreamReader
session = StreamInSession.get(remoteAddress.getAddress(),
header.sessionId);
session.addFiles(header.pendingFiles);
+ // set the current file we are streaming so progress shows up in jmx
+ session.setCurrentFile(header.file);
session.setTable(header.table);
// pendingFile gets the new context for the local node.
remoteFile = header.file;
@@ -82,7 +84,12 @@ public class IncomingStreamReader
long length = section.right - section.left;
long bytesRead = 0;
while (bytesRead < length)
- bytesRead += fc.transferFrom(socketChannel, offset +
bytesRead, length - bytesRead);
+ {
+ long toRead = Math.min(FileStreamTask.CHUNK_SIZE, length -
bytesRead);
+ long lastRead = fc.transferFrom(socketChannel, offset +
bytesRead, toRead);
+ bytesRead += lastRead;
+ remoteFile.progress += lastRead;
+ }
offset += length;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
Mon Sep 27 09:04:46 2010
@@ -48,6 +48,8 @@ public class PendingFile
public final Descriptor desc;
public final String component;
public final List<Pair<Long,Long>> sections;
+ public final long size;
+ public long progress;
public PendingFile(Descriptor desc, PendingFile pf)
{
@@ -59,6 +61,12 @@ public class PendingFile
this.desc = desc;
this.component = component;
this.sections = sections;
+ long tempSize = 0;
+ for(Pair<Long,Long> section : sections)
+ {
+ tempSize += section.right - section.left;
+ }
+ size = tempSize;
}
public String getFilename()
@@ -82,7 +90,7 @@ public class PendingFile
public String toString()
{
- return getFilename() + "/" + StringUtils.join(sections, ",");
+ return getFilename() + "/" + StringUtils.join(sections, ",") + "\n\t
progress=" + progress + "/" + size + " - " + progress*100/size + "%";
}
public static class PendingFileSerializer implements
ICompactSerializer<PendingFile>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1001629&r1=1001628&r2=1001629&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Mon Sep 27 09:04:46 2010
@@ -49,6 +49,7 @@ public class StreamInSession
private String table;
private final List<Future<SSTableReader>> buildFutures = new
ArrayList<Future<SSTableReader>>();
private ColumnFamilyStore cfs;
+ private PendingFile current;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
{
@@ -80,6 +81,11 @@ public class StreamInSession
return session;
}
+ public void setCurrentFile(PendingFile file)
+ {
+ this.current = file;
+ }
+
public void setTable(String table)
{
this.table = table;
@@ -106,6 +112,8 @@ public class StreamInSession
buildFutures.add(future);
files.remove(remoteFile);
+ if (remoteFile.equals(current))
+ current = null;
StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_FINISHED);
// send a StreamStatus message telling the source node it can delete
this file
MessagingService.instance.sendOneWay(reply.createMessage(), getHost());
@@ -179,17 +187,20 @@ public class StreamInSession
}
/** query the status of incoming files. */
- public static List<PendingFile> getIncomingFiles(InetAddress host)
+ public static Set<PendingFile> getIncomingFiles(InetAddress host)
{
- List<PendingFile> list = new ArrayList<PendingFile>();
+ Set<PendingFile> set = new HashSet<PendingFile>();
for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry :
sessions.entrySet())
{
if (entry.getKey().left.equals(host))
{
StreamInSession session = entry.getValue();
- list.addAll(session.files);
+ set.addAll(session.files);
+ if(session.current != null) {
+ set.add(session.current);
+ }
}
}
- return list;
+ return set;
}
}