Author: slebresne
Date: Mon May 23 12:51:03 2011
New Revision: 1126477

URL: http://svn.apache.org/viewvc?rev=1126477&view=rev
Log:
Add sstable bulk loading utility
patch by slebresne; reviewed by jbellis for CASSANDRA-1278

Added:
    cassandra/branches/cassandra-0.8/bin/sstableloader
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon May 23 12:51:03 2011
@@ -17,6 +17,7 @@
  * Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659)
  * Assert ranges are not overlapping in AB.normalize (CASSANDRA-2641)
  * Don't write CounterUpdateColumn to disk in tests (CASSANDRA-2650)
+ * Add sstable bulk loading utility (CASSANDRA-1278)
 
 
 0.8.0-final

Added: cassandra/branches/cassandra-0.8/bin/sstableloader
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/bin/sstableloader?rev=1126477&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/bin/sstableloader (added)
+++ cassandra/branches/cassandra-0.8/bin/sstableloader Mon May 23 12:51:03 2011
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+        -Dlog4j.configuration=log4j-tools.properties \
+        org.apache.cassandra.tools.BulkLoader "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
 Mon May 23 12:51:03 2011
@@ -85,9 +85,10 @@ public abstract class SSTable
 
     protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, 
EstimatedHistogram rowSizes, EstimatedHistogram columnCounts)
     {
+        // In almost all cases, metadata shouldn't be null, but allowing null 
allows to create a mostly functional SSTable without
+        // full schema definition. SSTableLoader use that ability
         assert descriptor != null;
         assert components != null;
-        assert metadata != null;
         assert replayPosition != null;
         assert partitioner != null;
         assert rowSizes != null;

Added: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1126477&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
 (added)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
 Mon May 23 12:51:03 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Cassandra SSTable bulk loader.
+ * Load an externally created sstable into a cluster.
+ */
+public class SSTableLoader
+{
+    private final File directory;
+    private final String keyspace;
+    private final Client client;
+    private final OutputHandler outputHandler;
+
+    public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler)
+    {
+        this.directory = directory;
+        this.keyspace = directory.getName();
+        this.client = client;
+        this.outputHandler = outputHandler;
+    }
+
+    private Collection<SSTableReader> openSSTables()
+    {
+        final List<SSTableReader> sstables = new LinkedList<SSTableReader>();
+
+        directory.list(new FilenameFilter()
+        {
+            public boolean accept(File dir, String name)
+            {
+                Pair<Descriptor, Component> p = 
SSTable.tryComponentFromFilename(dir, name);
+                Descriptor desc = p == null ? null : p.left;
+                if (p == null || !p.right.equals(Component.DATA) || 
desc.temporary)
+                    return false;
+
+                if (!new 
File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
+                {
+                    outputHandler.output(String.format("Skipping file %s 
because index is missing", name));
+                    return false;
+                }
+
+                if (!client.validateColumnFamily(keyspace, desc.cfname))
+                {
+                    outputHandler.output(String.format("Skipping file %s: 
column family %s.%s doesn't exist", name, keyspace, desc.cfname));
+                    return false;
+                }
+
+                Set<Component> components = new HashSet<Component>();
+                components.add(Component.DATA);
+                components.add(Component.PRIMARY_INDEX);
+
+                try
+                {
+                    sstables.add(SSTableReader.open(desc, components, null, 
StorageService.getPartitioner()));
+                }
+                catch (IOException e)
+                {
+                    outputHandler.output(String.format("Skipping file %s, 
error opening it: %s", name, e.getMessage()));
+                }
+                return false;
+            }
+        });
+        return sstables;
+    }
+
+    public LoaderFuture stream() throws IOException
+    {
+        return stream(Collections.<InetAddress>emptySet());
+    }
+
+    public LoaderFuture stream(Set<InetAddress> toIgnore) throws IOException
+    {
+        client.init();
+
+        Collection<SSTableReader> sstables = openSSTables();
+        if (sstables.isEmpty())
+        {
+            outputHandler.output("No sstables to stream");
+            return new LoaderFuture(0);
+        }
+
+        Map<InetAddress, Collection<Range>> endpointToRanges = 
client.getEndpointToRangesMap();
+        outputHandler.output(String.format("Streaming revelant part of %sto 
%s", names(sstables), endpointToRanges.keySet()));
+
+        // There will be one streaming session by endpoint
+        LoaderFuture future = new LoaderFuture(endpointToRanges.size());
+        for (Map.Entry<InetAddress, Collection<Range>> entry : 
endpointToRanges.entrySet())
+        {
+            InetAddress remote = entry.getKey();
+            if (toIgnore.contains(remote))
+            {
+                future.latch.countDown();
+                continue;
+            }
+            Collection<Range> ranges = entry.getValue();
+            StreamOutSession session = StreamOutSession.create(keyspace, 
remote, new CountDownCallback(future.latch, remote));
+            StreamOut.transferSSTables(session, sstables, ranges, 
OperationType.BULK_LOAD);
+            future.setPendings(remote, session.getFiles());
+        }
+        return future;
+    }
+
+    public static class LoaderFuture implements Future<Void>
+    {
+        final CountDownLatch latch;
+        final Map<InetAddress, Collection<PendingFile>> pendingFiles;
+
+        private LoaderFuture(int request)
+        {
+            latch = new CountDownLatch(request);
+            pendingFiles = new HashMap<InetAddress, Collection<PendingFile>>();
+        }
+
+        private void setPendings(InetAddress remote, Collection<PendingFile> 
files)
+        {
+            pendingFiles.put(remote, new ArrayList(files));
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            throw new UnsupportedOperationException("Cancellation is not yet 
supported");
+        }
+
+        public Void get() throws InterruptedException
+        {
+            latch.await();
+            return null;
+        }
+
+        public Void get(long timeout, TimeUnit unit) throws 
InterruptedException
+        {
+            latch.await(timeout, unit);
+            return null;
+        }
+
+        public boolean isCancelled()
+        {
+            // For now, cancellation is not supported, maybe one day...
+            return false;
+        }
+
+        public boolean isDone()
+        {
+            return latch.getCount() == 0;
+        }
+
+        public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
+        {
+            return pendingFiles;
+        }
+    }
+
+    private String names(Collection<SSTableReader> sstables)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (SSTableReader sstable : sstables)
+            
builder.append(sstable.descriptor.filenameFor(Component.DATA)).append(" ");
+        return builder.toString();
+    }
+
+    private class CountDownCallback implements Runnable
+    {
+        private final InetAddress endpoint;
+        private final CountDownLatch latch;
+
+        CountDownCallback(CountDownLatch latch, InetAddress endpoint)
+        {
+            this.latch = latch;
+            this.endpoint = endpoint;
+        }
+
+        public void run()
+        {
+            latch.countDown();
+            outputHandler.debug(String.format("Streaming session to %s 
completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
+
+            // There could be race with stop being called twice but it should 
be ok
+            if (latch.getCount() == 0)
+                client.stop();
+        }
+    }
+
+    public interface OutputHandler
+    {
+        // called when an important info need to be displayed
+        public void output(String msg);
+
+        // called when a less important info need to be displayed
+        public void debug(String msg);
+    }
+
+    public static abstract class Client
+    {
+        private final Map<InetAddress, Collection<Range>> endpointToRanges = 
new HashMap<InetAddress, Collection<Range>>();
+
+        /**
+         * Initialize the client.
+         * Perform any step necessary so that after the call to the this
+         * method:
+         *   * StorageService is correctly initialized (so that gossip and
+         *     messaging service is too)
+         *   * getEndpointToRangesMap() returns a correct map
+         * This method is guaranted to be called before any other method of a
+         * client.
+         */
+        public abstract void init();
+
+        /**
+         * Stop the client.
+         */
+        public void stop() {}
+
+        /**
+         * Validate that {@code keyspace} is an existing keyspace and {@code
+         * cfName} one of its existing column family.
+         */
+        public abstract boolean validateColumnFamily(String keyspace, String 
cfName);
+
+        public Map<InetAddress, Collection<Range>> getEndpointToRangesMap()
+        {
+            return endpointToRanges;
+        }
+
+        protected void addRangeForEndpoint(Range range, InetAddress endpoint)
+        {
+            Collection<Range> ranges = endpointToRanges.get(endpoint);
+            if (ranges == null)
+            {
+                ranges = new HashSet<Range>();
+                endpointToRanges.put(endpoint, ranges);
+            }
+            ranges.add(range);
+        }
+    }
+}

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
 Mon May 23 12:51:03 2011
@@ -150,6 +150,9 @@ public class SSTableReader extends SSTab
     public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData 
metadata, IPartitioner partitioner) throws IOException
     {
         assert partitioner != null;
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA);
+        assert components.contains(Component.PRIMARY_INDEX);
 
         long start = System.currentTimeMillis();
         logger.info("Opening " + descriptor);
@@ -158,7 +161,7 @@ public class SSTableReader extends SSTab
         EstimatedHistogram columnCounts;
         File statsFile = new 
File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
         ReplayPosition rp = ReplayPosition.NONE;
-        if (statsFile.exists())
+        if (components.contains(Component.STATS) && statsFile.exists())
         {
             DataInputStream dis = null;
             try
@@ -249,6 +252,12 @@ public class SSTableReader extends SSTab
 
     void loadBloomFilter() throws IOException
     {
+        if (!components.contains(Component.FILTER))
+        {
+            bf = BloomFilter.emptyFilter();
+            return;
+        }
+
         DataInputStream stream = null;
         try
         {

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
 Mon May 23 12:51:03 2011
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service;
 
+import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -50,6 +51,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
+import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -327,6 +329,8 @@ public class StorageService implements I
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
         MessagingService.instance().shutdown();
+        // give it a second so that task accepted before the MessagingService 
shutdown gets submitted to the stage (to avoid RejectedExecutionException)
+        try { Thread.sleep(1000L); } catch (InterruptedException e) {}
         StageManager.shutdownNow();
     }
     
@@ -354,7 +358,7 @@ public class StorageService implements I
         // sleep a while to allow gossip to warm up (the other nodes need to 
know about this one before they can reply).
         try
         {
-            Thread.sleep(5000L);
+            Thread.sleep(RING_DELAY);
         }
         catch (Exception ex)
         {
@@ -2428,4 +2432,37 @@ public class StorageService implements I
         return new Pair<Set<Range>, Set<Range>>(toStream, toFetch);
     }
 
+    public void bulkLoad(String directory)
+    {
+        File dir = new File(directory);
+
+        if (!dir.exists() || !dir.isDirectory())
+            throw new IllegalArgumentException("Invalid directory " + 
directory);
+
+        SSTableLoader.Client client = new SSTableLoader.Client()
+        {
+            public void init() {}
+
+            public boolean validateColumnFamily(String keyspace, String cfName)
+            {
+                return DatabaseDescriptor.getCFMetaData(keyspace, cfName) != 
null;
+            }
+        };
+
+        SSTableLoader.OutputHandler oh = new SSTableLoader.OutputHandler()
+        {
+            public void output(String msg) { logger_.info(msg); }
+            public void debug(String msg) { logger_.debug(msg); }
+        };
+
+        SSTableLoader loader = new SSTableLoader(dir, client, oh);
+        try
+        {
+            loader.stream().get();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Mon May 23 12:51:03 2011
@@ -290,4 +290,6 @@ public interface StorageServiceMBean
     public boolean isJoined();
 
     public void setCompactionThroughputMbPerSec(int value);
+
+    public void bulkLoad(String directory);
 }

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
 Mon May 23 12:51:03 2011
@@ -26,6 +26,7 @@ public enum OperationType
     AES,
     BOOTSTRAP,
     UNBOOTSTRAP,
-    RESTORE_REPLICA_COUNT;
+    RESTORE_REPLICA_COUNT,
+    BULK_LOAD;
 }
 

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
 Mon May 23 12:51:03 2011
@@ -134,7 +134,7 @@ public class StreamOutSession
             Thread.sleep(10);
     }
 
-    Collection<PendingFile> getFiles()
+    public Collection<PendingFile> getFiles()
     {
         return files.values();
     }

Added: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java?rev=1126477&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
 (added)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
 Mon May 23 12:51:03 2011
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PendingFile;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.commons.cli.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class BulkLoader
+{
+    private static final String TOOL_NAME = "sstableloader";
+    private static final String VERBOSE_OPTION  = "verbose";
+    private static final String DEBUG_OPTION  = "debug";
+    private static final String HELP_OPTION  = "help";
+    private static final String NOPROGRESS_OPTION  = "no-progress";
+    private static final String IGNORE_NODES_OPTION  = "ignore";
+
+    public static void main(String args[]) throws IOException
+    {
+        LoaderOptions options = LoaderOptions.parseArgs(args);
+        try
+        {
+            SSTableLoader loader = new SSTableLoader(options.directory, new 
ExternalClient(options.directory.getName(), options), options);
+            SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
+
+            if (options.noProgress)
+            {
+                future.get();
+            }
+            else
+            {
+                ProgressIndicator indicator = new 
ProgressIndicator(future.getPendingFiles());
+                indicator.start();
+                System.out.println("");
+                while (!future.isDone())
+                {
+                    if (indicator.printProgress())
+                    {
+                        // We're done with streaming
+                        System.out.println("\nWaiting for targets to rebuild 
indexes ...");
+                        future.get();
+                        assert future.isDone();
+                    }
+                    else
+                    {
+                        try { Thread.sleep(1000L); } catch (Exception e) {}
+                    }
+                }
+            }
+
+            System.exit(0); // We need that to stop non daemonized threads
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            if (options.debug)
+                e.printStackTrace(System.err);
+            System.exit(1);
+        }
+    }
+
+    // Return true when everything is at 100%
+    static class ProgressIndicator
+    {
+        private final Map<InetAddress, Collection<PendingFile>> filesByHost;
+        private long startTime;
+        private long lastProgress;
+        private long lastTime;
+
+        public ProgressIndicator(Map<InetAddress, Collection<PendingFile>> 
filesByHost)
+        {
+            this.filesByHost = new HashMap<InetAddress, 
Collection<PendingFile>>(filesByHost);
+        }
+
+        public void start()
+        {
+            startTime = System.currentTimeMillis();
+        }
+
+        public boolean printProgress()
+        {
+            boolean done = true;
+            StringBuilder sb = new StringBuilder();
+            sb.append("\rprogress: ");
+            long totalProgress = 0;
+            long totalSize = 0;
+            for (Map.Entry<InetAddress, Collection<PendingFile>> entry : 
filesByHost.entrySet())
+            {
+                long progress = 0;
+                long size = 0;
+                int completed = 0;
+                Collection<PendingFile> pendings = entry.getValue();
+                for (PendingFile f : pendings)
+                {
+                    progress += f.progress;
+                    size += f.size;
+                    if (f.progress == f.size)
+                        completed++;
+                }
+                totalProgress += progress;
+                totalSize += size;
+                if (completed != pendings.size())
+                    done = false;
+                sb.append("[").append(entry.getKey());
+                sb.append(" 
").append(completed).append("/").append(pendings.size());
+                sb.append(" (").append(size == 0 ? 100L : progress * 100L / 
size).append(")] ");
+            }
+            long time = System.currentTimeMillis();
+            long deltaTime = time - lastTime;
+            lastTime = time;
+            long deltaProgress = totalProgress - lastProgress;
+            lastProgress = totalProgress;
+
+            sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress 
* 100L / totalSize).append(" - ");
+            sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
+            sb.append(" (avg: ").append(mbPerSec(totalProgress, time - 
startTime)).append("MB/s)]");;
+            System.out.print(sb.toString());
+            return done;
+        }
+
+        private int mbPerSec(long bytes, long timeInMs)
+        {
+            double bytesPerMs = ((double)bytes) / timeInMs;
+            return (int)((bytesPerMs * 1000) / (1024 * 2024));
+        }
+    }
+
+    static class ExternalClient extends SSTableLoader.Client
+    {
+        private final String keyspace;
+        private final Map<String, Set<String>> knownCfs = new HashMap<String, 
Set<String>>();
+        private final SSTableLoader.OutputHandler outputHandler;
+
+        public ExternalClient(String keyspace, SSTableLoader.OutputHandler 
outputHandler)
+        {
+            super();
+            this.keyspace = keyspace;
+            this.outputHandler = outputHandler;
+        }
+
+        public void init()
+        {
+            outputHandler.output(String.format("Starting client (and waiting 
%d seconds for gossip) ...", StorageService.RING_DELAY / 1000));
+            try
+            {
+                // Init gossip
+                StorageService.instance.initClient();
+
+                Set<InetAddress> hosts = Gossiper.instance.getLiveMembers();
+                hosts.remove(FBUtilities.getLocalAddress());
+                if (hosts.isEmpty())
+                    throw new IllegalStateException("Cannot load any sstable, 
no live member found in the cluster");
+
+                // Query endpoint to ranges map and schemas from thrift
+                String host = hosts.iterator().next().toString().substring(1);
+                int port = DatabaseDescriptor.getRpcPort();
+
+                Cassandra.Client client = createThriftClient(host, port);
+                List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+                List<KsDef> ksDefs = client.describe_keyspaces();
+
+                Token.TokenFactory tkFactory = 
StorageService.getPartitioner().getTokenFactory();
+
+                try
+                {
+                    for (TokenRange tr : tokenRanges)
+                    {
+                        Range range = new 
Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+                        for (String ep : tr.endpoints)
+                        {
+                            addRangeForEndpoint(range, 
InetAddress.getByName(ep));
+                        }
+                    }
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException("Got an unknow host from 
describe_ring()", e);
+                }
+
+                for (KsDef ksDef : ksDefs)
+                {
+                    Set<String> cfs = new HashSet<String>();
+                    for (CfDef cfDef : ksDef.cf_defs)
+                        cfs.add(cfDef.name);
+                    knownCfs.put(ksDef.name, cfs);
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void stop()
+        {
+            StorageService.instance.stopClient();
+        }
+
+        public boolean validateColumnFamily(String keyspace, String cfName)
+        {
+            Set<String> cfs = knownCfs.get(keyspace);
+            return cfs != null && cfs.contains(cfName);
+        }
+
+        private static Cassandra.Client createThriftClient(String host, int 
port) throws TTransportException
+        {
+            TSocket socket = new TSocket(host, port);
+            TTransport trans = new TFramedTransport(socket);
+            trans.open();
+            TProtocol protocol = new TBinaryProtocol(trans);
+            return new Cassandra.Client(protocol);
+        }
+    }
+
+    static class LoaderOptions implements SSTableLoader.OutputHandler
+    {
+        public final File directory;
+
+        public boolean debug;
+        public boolean verbose;
+        public boolean noProgress;
+
+        public Set<InetAddress> ignores = new HashSet<InetAddress>();
+
+        LoaderOptions(File directory)
+        {
+            this.directory = directory;
+        }
+
+        public static LoaderOptions parseArgs(String cmdArgs[])
+        {
+            CommandLineParser parser = new GnuParser();
+            CmdLineOptions options = getCmdLineOptions();
+            try
+            {
+                CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+                if (cmd.hasOption(HELP_OPTION))
+                {
+                    printUsage(options);
+                    System.exit(0);
+                }
+
+                String[] args = cmd.getArgs();
+                if (args.length == 0)
+                {
+                    System.err.println("Missing sstable directory argument");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                if (args.length > 1)
+                {
+                    System.err.println("Too many arguments");
+                    printUsage(options);
+                    System.exit(1);
+                }
+
+                String dirname = args[0];
+                File dir = new File(dirname);
+
+                if (!dir.exists())
+                    errorMsg("Unknown directory: " + dirname, options);
+
+                if (!dir.isDirectory())
+                    errorMsg(dirname + " is not a directory", options);
+
+                LoaderOptions opts = new LoaderOptions(dir);
+
+                opts.debug = cmd.hasOption(DEBUG_OPTION);
+                opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+                opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
+
+                if (cmd.hasOption(IGNORE_NODES_OPTION))
+                {
+                    String[] nodes = 
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
+                    try
+                    {
+                        for (String node : nodes)
+                        {
+                            opts.ignores.add(InetAddress.getByName(node));
+                        }
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        errorMsg(e.getMessage(), options);
+                    }
+                }
+
+                return opts;
+            }
+            catch (ParseException e)
+            {
+                errorMsg(e.getMessage(), options);
+                return null;
+            }
+        }
+
+        private static void errorMsg(String msg, CmdLineOptions options)
+        {
+            System.err.println(msg);
+            printUsage(options);
+            System.exit(1);
+        }
+
+        public void output(String msg)
+        {
+            System.out.println(msg);
+        }
+
+        public void debug(String msg)
+        {
+            if (verbose)
+                System.out.println(msg);
+        }
+
+        private static CmdLineOptions getCmdLineOptions()
+        {
+            CmdLineOptions options = new CmdLineOptions();
+            options.addOption(null, DEBUG_OPTION,        "display stack 
traces");
+            options.addOption("v",  VERBOSE_OPTION,      "verbose output");
+            options.addOption("h",  HELP_OPTION,         "display this help 
message");
+            options.addOption(null, NOPROGRESS_OPTION,   "don't display 
progress");
+            options.addOption("i",  IGNORE_NODES_OPTION, "don't stream to this 
(comma separated) list of nodes");
+            return options;
+        }
+
+        public static void printUsage(Options options)
+        {
+            String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
+            StringBuilder header = new StringBuilder();
+            header.append("--\n");
+            header.append("Bulk load the sstables find in the directory 
<dir_path> to the configured cluster." );
+            header.append("The last directory of <dir_path> is used as the 
keyspace name. ");
+            header.append("So for instance, to load a sstable named 
Standard1-g-1-Data.db into keyspace Keyspace1, ");
+            header.append("you will need to have the files 
Standard1-g-1-Data.db and Standard1-g-1-Index.db in a ");
+            header.append("directory Keyspace1/ in the current directory and 
call: sstableloader Keyspace1");
+            header.append("\n--\n");
+            header.append("Options are:");
+            new HelpFormatter().printHelp(usage, header.toString(), options, 
"");
+        }
+    }
+
+    private static class CmdLineOptions extends Options
+    {
+        /**
+         * Add option with argument and argument name
+         * @param opt shortcut for option name
+         * @param longOpt complete option name
+         * @param argName argument name
+         * @param description description of the option
+         * @return updated Options object
+         */
+        public Options addOption(String opt, String longOpt, String argName, 
String description)
+        {
+            Option option = new Option(opt, longOpt, true, description);
+            option.setArgName(argName);
+
+            return addOption(option);
+        }
+
+        /**
+         * Add option without argument
+         * @param opt shortcut for option name
+         * @param longOpt complete option name
+         * @param description description of the option
+         * @return updated Options object
+         */
+        public Options addOption(String opt, String longOpt, String 
description)
+        {
+            return addOption(new Option(opt, longOpt, false, description));
+        }
+    }
+}

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
 Mon May 23 12:51:03 2011
@@ -41,6 +41,11 @@ public class BloomFilter extends Filter
         bitset = bs;
     }
 
+    public static BloomFilter emptyFilter()
+    {
+        return new BloomFilter(0, bucketsFor(0, 0));
+    }
+
     public static ICompactSerializer2<BloomFilter> serializer()
     {
         return serializer_;


Reply via email to