Author: jbellis
Date: Sat Jun 19 00:58:15 2010
New Revision: 956168

URL: http://svn.apache.org/viewvc?rev=956168&view=rev
Log:
clean up PendingFile.  patch by jbellis; reviewed by Stu Hood for CASSANDRA-1208

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Sat 
Jun 19 00:58:15 2010
@@ -89,14 +89,14 @@ public class FileStreamTask extends Wrap
             assert buffer.remaining() == 0;
             
             // stream sections of the file as returned by 
PendingFile.currentSection
-            Pair<Long,Long> section;
-            while ((section = file.currentSection()) != null)
+            for (Pair<Long, Long> section : file.sections)
             {
-                long length = Math.min(CHUNK_SIZE, section.right - 
section.left);
-                long bytesTransferred = fc.transferTo(section.left, length, 
channel);
+                long length = section.right - section.left;
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                    bytesTransferred += fc.transferTo(section.left + 
bytesTransferred, length - bytesTransferred, channel);
                 if (logger.isDebugEnabled())
                     logger.debug("Bytes transferred " + bytesTransferred);
-                file.update(section.left + bytesTransferred);
             }
         }
         finally

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java 
Sat Jun 19 00:58:15 2010
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 
@@ -55,10 +54,10 @@ class FileStatusHandler
             "Unknown stream action: " + streamStatus.getAction();
 
         // file was successfully streamed
-        Descriptor desc = pendingFile.getDescriptor();
+        Descriptor desc = pendingFile.desc;
         try
         {
-            SSTableReader sstable = 
SSTableWriter.recoverAndOpen(pendingFile.getDescriptor());
+            SSTableReader sstable = 
SSTableWriter.recoverAndOpen(pendingFile.desc);
             
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
             logger.info("Streaming added " + sstable);
         }
@@ -76,7 +75,7 @@ class FileStatusHandler
         // if all files have been received from this host, remove from 
bootstrap sources
         if (StreamInManager.isDone(host) && 
StorageService.instance.isBootstrapMode())
         {
-            StorageService.instance.removeBootstrapSource(host, 
pendingFile.getDescriptor().ksname);
+            StorageService.instance.removeBootstrapSource(host, 
pendingFile.desc.ksname);
         }
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
 Sat Jun 19 00:58:15 2010
@@ -27,6 +27,7 @@ import java.io.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.FileStreamTask;
 import org.apache.cassandra.utils.Pair;
 
@@ -62,15 +63,13 @@ public class IncomingStreamReader
         long offset = 0;
         try
         {
-            Pair<Long,Long> section;
-            while ((section = pendingFile.currentSection()) != null)
+            for (Pair<Long, Long> section : pendingFile.sections)
             {
-                long length = Math.min(FileStreamTask.CHUNK_SIZE, 
section.right - section.left);
-                long bytesRead = fc.transferFrom(socketChannel, offset, 
length);
-                // offset in the remote file
-                pendingFile.update(section.left + bytesRead);
-                // offset in the local file
-                offset += bytesRead;
+                long length = section.right - section.left;
+                long bytesRead = 0;
+                while (bytesRead < length)
+                    bytesRead += fc.transferFrom(socketChannel, offset + 
bytesRead, length - bytesRead);
+                offset += length;
             }
         }
         catch (IOException ex)
@@ -80,10 +79,7 @@ public class IncomingStreamReader
             streamStatus.setAction(FileStatus.Action.STREAM);
             handleFileStatus(remoteAddress.getAddress());
             /* Delete the orphaned file. */
-            File file = new File(pendingFile.getFilename());
-            file.delete();
-            /* Reset our state. */
-            pendingFile.update(0);
+            FileUtils.deleteWithConfirm(new File(pendingFile.getFilename()));
             throw ex;
         }
         finally

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java 
Sat Jun 19 00:58:15 2010
@@ -48,10 +48,9 @@ public class PendingFile
         return serializer_;
     }
 
-    private final Descriptor desc;
-    private final String component;
-    private final List<Pair<Long,Long>> sections;
-    private long ptr;
+    public final Descriptor desc;
+    public final String component;
+    public final List<Pair<Long,Long>> sections;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
@@ -63,36 +62,8 @@ public class PendingFile
         this.desc = desc;
         this.component = component;
         this.sections = sections;
-        ptr = 0;
     }
 
-    public void update(long ptr)
-    {
-        this.ptr = ptr;
-    }
-
-    /**
-     * @return The current section of the file, as an (offset,end) pair, or 
null if nothing left to stream.
-     */
-    public Pair<Long,Long> currentSection()
-    {
-        // linear search for the first appropriate section
-        for (Pair<Long,Long> section : sections)
-            if (ptr < section.right)
-                return new Pair<Long,Long>(Long.valueOf(Math.max(ptr, 
section.left)), section.right);
-        return null;
-    }
-
-    public String getComponent()
-    {
-        return component;
-    }
-
-    public Descriptor getDescriptor()
-    {
-        return desc;
-    }
-    
     public String getFilename()
     {
         return desc.filenameFor(component);
@@ -114,7 +85,7 @@ public class PendingFile
 
     public String toString()
     {
-        return getFilename() + ":" + ptr + "/" + sections;
+        return getFilename() + "/" + sections;
     }
 
     private static class PendingFileSerializer implements 
ICompactSerializer<PendingFile>

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
 Sat Jun 19 00:58:15 2010
@@ -105,7 +105,7 @@ public class StreamInitiateVerbHandler i
         LinkedHashMap<PendingFile, PendingFile> mapping = new 
LinkedHashMap<PendingFile, PendingFile>();
         for (PendingFile remote : remoteFiles)
         {
-            Descriptor remotedesc = remote.getDescriptor();
+            Descriptor remotedesc = remote.desc;
 
             // new local sstable
             Table table = Table.open(remotedesc.ksname);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java 
Sat Jun 19 00:58:15 2010
@@ -116,19 +116,6 @@ public class StreamOutManager
             fileMap.put(pendingFile.getFilename(), pendingFile);
         }
     }
-
-    /**
-     * An (offset,end) pair representing the current section of the file to 
stream.
-     */
-    public Pair<Long,Long> currentSection(String path)
-    {
-        return fileMap.get(path).currentSection();
-    }
-
-    public void update(String path, long pos)
-    {
-        fileMap.get(path).update(pos);
-    }
     
     public void startNext()
     {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java 
Sat Jun 19 00:58:15 2010
@@ -108,7 +108,7 @@ public class StreamingService implements
         List<String> files = new ArrayList<String>();
         for (PendingFile pf : 
StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
         {
-            files.add(String.format("%s: %s", pf.getDescriptor().ksname, 
pf.toString()));
+            files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
         }
         return files;
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=956168&r1=956167&r2=956168&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java 
Sat Jun 19 00:58:15 2010
@@ -51,9 +51,9 @@ public class BootstrapTest extends Schem
             assert !inContext.getFilename().equals(outContext.getFilename());
 
             // nothing else should
-            assertEquals(inContext.getComponent(), outContext.getComponent());
-            assertEquals(inContext.getDescriptor().ksname, 
outContext.getDescriptor().ksname);
-            assertEquals(inContext.getDescriptor().cfname, 
outContext.getDescriptor().cfname);
+            assertEquals(inContext.component, outContext.component);
+            assertEquals(inContext.desc.ksname, outContext.desc.ksname);
+            assertEquals(inContext.desc.cfname, outContext.desc.cfname);
         }
     }
 }


Reply via email to