Author: jbellis
Date: Sat Jun 19 00:58:15 2010
New Revision: 956168
URL: http://svn.apache.org/viewvc?rev=956168&view=rev
Log:
clean up PendingFile. patch by jbellis; reviewed by Stu Hood for CASSANDRA-1208
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Sat
Jun 19 00:58:15 2010
@@ -89,14 +89,14 @@ public class FileStreamTask extends Wrap
assert buffer.remaining() == 0;
// stream sections of the file as returned by
PendingFile.currentSection
- Pair<Long,Long> section;
- while ((section = file.currentSection()) != null)
+ for (Pair<Long, Long> section : file.sections)
{
- long length = Math.min(CHUNK_SIZE, section.right -
section.left);
- long bytesTransferred = fc.transferTo(section.left, length,
channel);
+ long length = section.right - section.left;
+ long bytesTransferred = 0;
+ while (bytesTransferred < length)
+ bytesTransferred += fc.transferTo(section.left +
bytesTransferred, length - bytesTransferred, channel);
if (logger.isDebugEnabled())
logger.debug("Bytes transferred " + bytesTransferred);
- file.update(section.left + bytesTransferred);
}
}
finally
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
Sat Jun 19 00:58:15 2010
@@ -19,7 +19,6 @@
package org.apache.cassandra.streaming;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@@ -55,10 +54,10 @@ class FileStatusHandler
"Unknown stream action: " + streamStatus.getAction();
// file was successfully streamed
- Descriptor desc = pendingFile.getDescriptor();
+ Descriptor desc = pendingFile.desc;
try
{
- SSTableReader sstable =
SSTableWriter.recoverAndOpen(pendingFile.getDescriptor());
+ SSTableReader sstable =
SSTableWriter.recoverAndOpen(pendingFile.desc);
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
logger.info("Streaming added " + sstable);
}
@@ -76,7 +75,7 @@ class FileStatusHandler
// if all files have been received from this host, remove from
bootstrap sources
if (StreamInManager.isDone(host) &&
StorageService.instance.isBootstrapMode())
{
- StorageService.instance.removeBootstrapSource(host,
pendingFile.getDescriptor().ksname);
+ StorageService.instance.removeBootstrapSource(host,
pendingFile.desc.ksname);
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Sat Jun 19 00:58:15 2010
@@ -27,6 +27,7 @@ import java.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.FileStreamTask;
import org.apache.cassandra.utils.Pair;
@@ -62,15 +63,13 @@ public class IncomingStreamReader
long offset = 0;
try
{
- Pair<Long,Long> section;
- while ((section = pendingFile.currentSection()) != null)
+ for (Pair<Long, Long> section : pendingFile.sections)
{
- long length = Math.min(FileStreamTask.CHUNK_SIZE,
section.right - section.left);
- long bytesRead = fc.transferFrom(socketChannel, offset,
length);
- // offset in the remote file
- pendingFile.update(section.left + bytesRead);
- // offset in the local file
- offset += bytesRead;
+ long length = section.right - section.left;
+ long bytesRead = 0;
+ while (bytesRead < length)
+ bytesRead += fc.transferFrom(socketChannel, offset +
bytesRead, length - bytesRead);
+ offset += length;
}
}
catch (IOException ex)
@@ -80,10 +79,7 @@ public class IncomingStreamReader
streamStatus.setAction(FileStatus.Action.STREAM);
handleFileStatus(remoteAddress.getAddress());
/* Delete the orphaned file. */
- File file = new File(pendingFile.getFilename());
- file.delete();
- /* Reset our state. */
- pendingFile.update(0);
+ FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
throw ex;
}
finally
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
Sat Jun 19 00:58:15 2010
@@ -48,10 +48,9 @@ public class PendingFile
return serializer_;
}
- private final Descriptor desc;
- private final String component;
- private final List<Pair<Long,Long>> sections;
- private long ptr;
+ public final Descriptor desc;
+ public final String component;
+ public final List<Pair<Long,Long>> sections;
public PendingFile(Descriptor desc, PendingFile pf)
{
@@ -63,36 +62,8 @@ public class PendingFile
this.desc = desc;
this.component = component;
this.sections = sections;
- ptr = 0;
}
- public void update(long ptr)
- {
- this.ptr = ptr;
- }
-
- /**
- * @return The current section of the file, as an (offset,end) pair, or
null if nothing left to stream.
- */
- public Pair<Long,Long> currentSection()
- {
- // linear search for the first appropriate section
- for (Pair<Long,Long> section : sections)
- if (ptr < section.right)
- return new Pair<Long,Long>(Long.valueOf(Math.max(ptr,
section.left)), section.right);
- return null;
- }
-
- public String getComponent()
- {
- return component;
- }
-
- public Descriptor getDescriptor()
- {
- return desc;
- }
-
public String getFilename()
{
return desc.filenameFor(component);
@@ -114,7 +85,7 @@ public class PendingFile
public String toString()
{
- return getFilename() + ":" + ptr + "/" + sections;
+ return getFilename() + "/" + sections;
}
private static class PendingFileSerializer implements
ICompactSerializer<PendingFile>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
Sat Jun 19 00:58:15 2010
@@ -105,7 +105,7 @@ public class StreamInitiateVerbHandler i
LinkedHashMap<PendingFile, PendingFile> mapping = new
LinkedHashMap<PendingFile, PendingFile>();
for (PendingFile remote : remoteFiles)
{
- Descriptor remotedesc = remote.getDescriptor();
+ Descriptor remotedesc = remote.desc;
// new local sstable
Table table = Table.open(remotedesc.ksname);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
Sat Jun 19 00:58:15 2010
@@ -116,19 +116,6 @@ public class StreamOutManager
fileMap.put(pendingFile.getFilename(), pendingFile);
}
}
-
- /**
- * An (offset,end) pair representing the current section of the file to
stream.
- */
- public Pair<Long,Long> currentSection(String path)
- {
- return fileMap.get(path).currentSection();
- }
-
- public void update(String path, long pos)
- {
- fileMap.get(path).update(pos);
- }
public void startNext()
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
Sat Jun 19 00:58:15 2010
@@ -108,7 +108,7 @@ public class StreamingService implements
List<String> files = new ArrayList<String>();
for (PendingFile pf :
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
{
- files.add(String.format("%s: %s", pf.getDescriptor().ksname,
pf.toString()));
+ files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
}
return files;
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
Sat Jun 19 00:58:15 2010
@@ -51,9 +51,9 @@ public class BootstrapTest extends Schem
assert !inContext.getFilename().equals(outContext.getFilename());
// nothing else should
- assertEquals(inContext.getComponent(), outContext.getComponent());
- assertEquals(inContext.getDescriptor().ksname,
outContext.getDescriptor().ksname);
- assertEquals(inContext.getDescriptor().cfname,
outContext.getDescriptor().cfname);
+ assertEquals(inContext.component, outContext.component);
+ assertEquals(inContext.desc.ksname, outContext.desc.ksname);
+ assertEquals(inContext.desc.cfname, outContext.desc.cfname);
}
}
}