Author: slebresne
Date: Mon May 23 12:51:03 2011
New Revision: 1126477
URL: http://svn.apache.org/viewvc?rev=1126477&view=rev
Log:
Add sstable bulk loading utility
patch by slebresne; reviewed by jbellis for CASSANDRA-1278
Added:
cassandra/branches/cassandra-0.8/bin/sstableloader
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon May 23 12:51:03 2011
@@ -17,6 +17,7 @@
* Improve forceDeserialize/getCompactedRow encapsulation (CASSANDRA-2659)
* Assert ranges are not overlapping in AB.normalize (CASSANDRA-2641)
* Don't write CounterUpdateColumn to disk in tests (CASSANDRA-2650)
+ * Add sstable bulk loading utility (CASSANDRA-1278)
0.8.0-final
Added: cassandra/branches/cassandra-0.8/bin/sstableloader
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/bin/sstableloader?rev=1126477&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/bin/sstableloader (added)
+++ cassandra/branches/cassandra-0.8/bin/sstableloader Mon May 23 12:51:03 2011
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ for include in /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh \
+ ~/.cassandra.in.sh \
+ `dirname $0`/cassandra.in.sh; do
+ if [ -r $include ]; then
+ . $include
+ break
+ fi
+ done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+ . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+ echo "You must set the CLASSPATH var" >&2
+ exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+ -Dlog4j.configuration=log4j-tools.properties \
+ org.apache.cassandra.tools.BulkLoader "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTable.java
Mon May 23 12:51:03 2011
@@ -85,9 +85,10 @@ public abstract class SSTable
protected SSTable(Descriptor descriptor, Set<Component> components,
CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner,
EstimatedHistogram rowSizes, EstimatedHistogram columnCounts)
{
+ // In almost all cases, metadata shouldn't be null, but allowing null
allows to create a mostly functional SSTable without
+ // full schema definition. SSTableLoader use that ability
assert descriptor != null;
assert components != null;
- assert metadata != null;
assert replayPosition != null;
assert partitioner != null;
assert rowSizes != null;
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1126477&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
(added)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
Mon May 23 12:51:03 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Cassandra SSTable bulk loader.
+ * Load an externally created sstable into a cluster.
+ */
+public class SSTableLoader
+{
+ private final File directory;
+ private final String keyspace;
+ private final Client client;
+ private final OutputHandler outputHandler;
+
+ public SSTableLoader(File directory, Client client, OutputHandler
outputHandler)
+ {
+ this.directory = directory;
+ this.keyspace = directory.getName();
+ this.client = client;
+ this.outputHandler = outputHandler;
+ }
+
+ private Collection<SSTableReader> openSSTables()
+ {
+ final List<SSTableReader> sstables = new LinkedList<SSTableReader>();
+
+ directory.list(new FilenameFilter()
+ {
+ public boolean accept(File dir, String name)
+ {
+ Pair<Descriptor, Component> p =
SSTable.tryComponentFromFilename(dir, name);
+ Descriptor desc = p == null ? null : p.left;
+ if (p == null || !p.right.equals(Component.DATA) ||
desc.temporary)
+ return false;
+
+ if (!new
File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ outputHandler.output(String.format("Skipping file %s
because index is missing", name));
+ return false;
+ }
+
+ if (!client.validateColumnFamily(keyspace, desc.cfname))
+ {
+ outputHandler.output(String.format("Skipping file %s:
column family %s.%s doesn't exist", name, keyspace, desc.cfname));
+ return false;
+ }
+
+ Set<Component> components = new HashSet<Component>();
+ components.add(Component.DATA);
+ components.add(Component.PRIMARY_INDEX);
+
+ try
+ {
+ sstables.add(SSTableReader.open(desc, components, null,
StorageService.getPartitioner()));
+ }
+ catch (IOException e)
+ {
+ outputHandler.output(String.format("Skipping file %s,
error opening it: %s", name, e.getMessage()));
+ }
+ return false;
+ }
+ });
+ return sstables;
+ }
+
+ public LoaderFuture stream() throws IOException
+ {
+ return stream(Collections.<InetAddress>emptySet());
+ }
+
+ public LoaderFuture stream(Set<InetAddress> toIgnore) throws IOException
+ {
+ client.init();
+
+ Collection<SSTableReader> sstables = openSSTables();
+ if (sstables.isEmpty())
+ {
+ outputHandler.output("No sstables to stream");
+ return new LoaderFuture(0);
+ }
+
+ Map<InetAddress, Collection<Range>> endpointToRanges =
client.getEndpointToRangesMap();
+ outputHandler.output(String.format("Streaming revelant part of %sto
%s", names(sstables), endpointToRanges.keySet()));
+
+ // There will be one streaming session by endpoint
+ LoaderFuture future = new LoaderFuture(endpointToRanges.size());
+ for (Map.Entry<InetAddress, Collection<Range>> entry :
endpointToRanges.entrySet())
+ {
+ InetAddress remote = entry.getKey();
+ if (toIgnore.contains(remote))
+ {
+ future.latch.countDown();
+ continue;
+ }
+ Collection<Range> ranges = entry.getValue();
+ StreamOutSession session = StreamOutSession.create(keyspace,
remote, new CountDownCallback(future.latch, remote));
+ StreamOut.transferSSTables(session, sstables, ranges,
OperationType.BULK_LOAD);
+ future.setPendings(remote, session.getFiles());
+ }
+ return future;
+ }
+
+ public static class LoaderFuture implements Future<Void>
+ {
+ final CountDownLatch latch;
+ final Map<InetAddress, Collection<PendingFile>> pendingFiles;
+
+ private LoaderFuture(int request)
+ {
+ latch = new CountDownLatch(request);
+ pendingFiles = new HashMap<InetAddress, Collection<PendingFile>>();
+ }
+
+ private void setPendings(InetAddress remote, Collection<PendingFile>
files)
+ {
+ pendingFiles.put(remote, new ArrayList(files));
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ throw new UnsupportedOperationException("Cancellation is not yet
supported");
+ }
+
+ public Void get() throws InterruptedException
+ {
+ latch.await();
+ return null;
+ }
+
+ public Void get(long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ latch.await(timeout, unit);
+ return null;
+ }
+
+ public boolean isCancelled()
+ {
+ // For now, cancellation is not supported, maybe one day...
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return latch.getCount() == 0;
+ }
+
+ public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
+ {
+ return pendingFiles;
+ }
+ }
+
+ private String names(Collection<SSTableReader> sstables)
+ {
+ StringBuilder builder = new StringBuilder();
+ for (SSTableReader sstable : sstables)
+
builder.append(sstable.descriptor.filenameFor(Component.DATA)).append(" ");
+ return builder.toString();
+ }
+
+ private class CountDownCallback implements Runnable
+ {
+ private final InetAddress endpoint;
+ private final CountDownLatch latch;
+
+ CountDownCallback(CountDownLatch latch, InetAddress endpoint)
+ {
+ this.latch = latch;
+ this.endpoint = endpoint;
+ }
+
+ public void run()
+ {
+ latch.countDown();
+ outputHandler.debug(String.format("Streaming session to %s
completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
+
+ // There could be race with stop being called twice but it should
be ok
+ if (latch.getCount() == 0)
+ client.stop();
+ }
+ }
+
+ public interface OutputHandler
+ {
+ // called when an important info need to be displayed
+ public void output(String msg);
+
+ // called when a less important info need to be displayed
+ public void debug(String msg);
+ }
+
+ public static abstract class Client
+ {
+ private final Map<InetAddress, Collection<Range>> endpointToRanges =
new HashMap<InetAddress, Collection<Range>>();
+
+ /**
+ * Initialize the client.
+ * Perform any step necessary so that after the call to the this
+ * method:
+ * * StorageService is correctly initialized (so that gossip and
+ * messaging service is too)
+ * * getEndpointToRangesMap() returns a correct map
+ * This method is guaranted to be called before any other method of a
+ * client.
+ */
+ public abstract void init();
+
+ /**
+ * Stop the client.
+ */
+ public void stop() {}
+
+ /**
+ * Validate that {@code keyspace} is an existing keyspace and {@code
+ * cfName} one of its existing column family.
+ */
+ public abstract boolean validateColumnFamily(String keyspace, String
cfName);
+
+ public Map<InetAddress, Collection<Range>> getEndpointToRangesMap()
+ {
+ return endpointToRanges;
+ }
+
+ protected void addRangeForEndpoint(Range range, InetAddress endpoint)
+ {
+ Collection<Range> ranges = endpointToRanges.get(endpoint);
+ if (ranges == null)
+ {
+ ranges = new HashSet<Range>();
+ endpointToRanges.put(endpoint, ranges);
+ }
+ ranges.add(range);
+ }
+ }
+}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Mon May 23 12:51:03 2011
@@ -150,6 +150,9 @@ public class SSTableReader extends SSTab
public static SSTableReader open(Descriptor descriptor, Set<Component>
components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData
metadata, IPartitioner partitioner) throws IOException
{
assert partitioner != null;
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA);
+ assert components.contains(Component.PRIMARY_INDEX);
long start = System.currentTimeMillis();
logger.info("Opening " + descriptor);
@@ -158,7 +161,7 @@ public class SSTableReader extends SSTab
EstimatedHistogram columnCounts;
File statsFile = new
File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
ReplayPosition rp = ReplayPosition.NONE;
- if (statsFile.exists())
+ if (components.contains(Component.STATS) && statsFile.exists())
{
DataInputStream dis = null;
try
@@ -249,6 +252,12 @@ public class SSTableReader extends SSTab
void loadBloomFilter() throws IOException
{
+ if (!components.contains(Component.FILTER))
+ {
+ bf = BloomFilter.emptyFilter();
+ return;
+ }
+
DataInputStream stream = null;
try
{
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
Mon May 23 12:51:03 2011
@@ -18,6 +18,7 @@
package org.apache.cassandra.service;
+import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
@@ -50,6 +51,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
+import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -327,6 +329,8 @@ public class StorageService implements I
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
MessagingService.instance().shutdown();
+ // give it a second so that task accepted before the MessagingService
shutdown gets submitted to the stage (to avoid RejectedExecutionException)
+ try { Thread.sleep(1000L); } catch (InterruptedException e) {}
StageManager.shutdownNow();
}
@@ -354,7 +358,7 @@ public class StorageService implements I
// sleep a while to allow gossip to warm up (the other nodes need to
know about this one before they can reply).
try
{
- Thread.sleep(5000L);
+ Thread.sleep(RING_DELAY);
}
catch (Exception ex)
{
@@ -2428,4 +2432,37 @@ public class StorageService implements I
return new Pair<Set<Range>, Set<Range>>(toStream, toFetch);
}
+ public void bulkLoad(String directory)
+ {
+ File dir = new File(directory);
+
+ if (!dir.exists() || !dir.isDirectory())
+ throw new IllegalArgumentException("Invalid directory " +
directory);
+
+ SSTableLoader.Client client = new SSTableLoader.Client()
+ {
+ public void init() {}
+
+ public boolean validateColumnFamily(String keyspace, String cfName)
+ {
+ return DatabaseDescriptor.getCFMetaData(keyspace, cfName) !=
null;
+ }
+ };
+
+ SSTableLoader.OutputHandler oh = new SSTableLoader.OutputHandler()
+ {
+ public void output(String msg) { logger_.info(msg); }
+ public void debug(String msg) { logger_.debug(msg); }
+ };
+
+ SSTableLoader loader = new SSTableLoader(dir, client, oh);
+ try
+ {
+ loader.stream().get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Mon May 23 12:51:03 2011
@@ -290,4 +290,6 @@ public interface StorageServiceMBean
public boolean isJoined();
public void setCompactionThroughputMbPerSec(int value);
+
+ public void bulkLoad(String directory);
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/OperationType.java
Mon May 23 12:51:03 2011
@@ -26,6 +26,7 @@ public enum OperationType
AES,
BOOTSTRAP,
UNBOOTSTRAP,
- RESTORE_REPLICA_COUNT;
+ RESTORE_REPLICA_COUNT,
+ BULK_LOAD;
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Mon May 23 12:51:03 2011
@@ -134,7 +134,7 @@ public class StreamOutSession
Thread.sleep(10);
}
- Collection<PendingFile> getFiles()
+ public Collection<PendingFile> getFiles()
{
return files.values();
}
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java?rev=1126477&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
(added)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/BulkLoader.java
Mon May 23 12:51:03 2011
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PendingFile;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.commons.cli.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class BulkLoader
+{
+ private static final String TOOL_NAME = "sstableloader";
+ private static final String VERBOSE_OPTION = "verbose";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String HELP_OPTION = "help";
+ private static final String NOPROGRESS_OPTION = "no-progress";
+ private static final String IGNORE_NODES_OPTION = "ignore";
+
+ public static void main(String args[]) throws IOException
+ {
+ LoaderOptions options = LoaderOptions.parseArgs(args);
+ try
+ {
+ SSTableLoader loader = new SSTableLoader(options.directory, new
ExternalClient(options.directory.getName(), options), options);
+ SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
+
+ if (options.noProgress)
+ {
+ future.get();
+ }
+ else
+ {
+ ProgressIndicator indicator = new
ProgressIndicator(future.getPendingFiles());
+ indicator.start();
+ System.out.println("");
+ while (!future.isDone())
+ {
+ if (indicator.printProgress())
+ {
+ // We're done with streaming
+ System.out.println("\nWaiting for targets to rebuild
indexes ...");
+ future.get();
+ assert future.isDone();
+ }
+ else
+ {
+ try { Thread.sleep(1000L); } catch (Exception e) {}
+ }
+ }
+ }
+
+ System.exit(0); // We need that to stop non daemonized threads
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ if (options.debug)
+ e.printStackTrace(System.err);
+ System.exit(1);
+ }
+ }
+
+ // Return true when everything is at 100%
+ static class ProgressIndicator
+ {
+ private final Map<InetAddress, Collection<PendingFile>> filesByHost;
+ private long startTime;
+ private long lastProgress;
+ private long lastTime;
+
+ public ProgressIndicator(Map<InetAddress, Collection<PendingFile>>
filesByHost)
+ {
+ this.filesByHost = new HashMap<InetAddress,
Collection<PendingFile>>(filesByHost);
+ }
+
+ public void start()
+ {
+ startTime = System.currentTimeMillis();
+ }
+
+ public boolean printProgress()
+ {
+ boolean done = true;
+ StringBuilder sb = new StringBuilder();
+ sb.append("\rprogress: ");
+ long totalProgress = 0;
+ long totalSize = 0;
+ for (Map.Entry<InetAddress, Collection<PendingFile>> entry :
filesByHost.entrySet())
+ {
+ long progress = 0;
+ long size = 0;
+ int completed = 0;
+ Collection<PendingFile> pendings = entry.getValue();
+ for (PendingFile f : pendings)
+ {
+ progress += f.progress;
+ size += f.size;
+ if (f.progress == f.size)
+ completed++;
+ }
+ totalProgress += progress;
+ totalSize += size;
+ if (completed != pendings.size())
+ done = false;
+ sb.append("[").append(entry.getKey());
+ sb.append("
").append(completed).append("/").append(pendings.size());
+ sb.append(" (").append(size == 0 ? 100L : progress * 100L /
size).append(")] ");
+ }
+ long time = System.currentTimeMillis();
+ long deltaTime = time - lastTime;
+ lastTime = time;
+ long deltaProgress = totalProgress - lastProgress;
+ lastProgress = totalProgress;
+
+ sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress
* 100L / totalSize).append(" - ");
+ sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
+ sb.append(" (avg: ").append(mbPerSec(totalProgress, time -
startTime)).append("MB/s)]");;
+ System.out.print(sb.toString());
+ return done;
+ }
+
+ private int mbPerSec(long bytes, long timeInMs)
+ {
+ double bytesPerMs = ((double)bytes) / timeInMs;
+ return (int)((bytesPerMs * 1000) / (1024 * 2024));
+ }
+ }
+
+ static class ExternalClient extends SSTableLoader.Client
+ {
+ private final String keyspace;
+ private final Map<String, Set<String>> knownCfs = new HashMap<String,
Set<String>>();
+ private final SSTableLoader.OutputHandler outputHandler;
+
+ public ExternalClient(String keyspace, SSTableLoader.OutputHandler
outputHandler)
+ {
+ super();
+ this.keyspace = keyspace;
+ this.outputHandler = outputHandler;
+ }
+
+ public void init()
+ {
+ outputHandler.output(String.format("Starting client (and waiting
%d seconds for gossip) ...", StorageService.RING_DELAY / 1000));
+ try
+ {
+ // Init gossip
+ StorageService.instance.initClient();
+
+ Set<InetAddress> hosts = Gossiper.instance.getLiveMembers();
+ hosts.remove(FBUtilities.getLocalAddress());
+ if (hosts.isEmpty())
+ throw new IllegalStateException("Cannot load any sstable,
no live member found in the cluster");
+
+ // Query endpoint to ranges map and schemas from thrift
+ String host = hosts.iterator().next().toString().substring(1);
+ int port = DatabaseDescriptor.getRpcPort();
+
+ Cassandra.Client client = createThriftClient(host, port);
+ List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+ List<KsDef> ksDefs = client.describe_keyspaces();
+
+ Token.TokenFactory tkFactory =
StorageService.getPartitioner().getTokenFactory();
+
+ try
+ {
+ for (TokenRange tr : tokenRanges)
+ {
+ Range range = new
Range(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ for (String ep : tr.endpoints)
+ {
+ addRangeForEndpoint(range,
InetAddress.getByName(ep));
+ }
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException("Got an unknow host from
describe_ring()", e);
+ }
+
+ for (KsDef ksDef : ksDefs)
+ {
+ Set<String> cfs = new HashSet<String>();
+ for (CfDef cfDef : ksDef.cf_defs)
+ cfs.add(cfDef.name);
+ knownCfs.put(ksDef.name, cfs);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ StorageService.instance.stopClient();
+ }
+
+ public boolean validateColumnFamily(String keyspace, String cfName)
+ {
+ Set<String> cfs = knownCfs.get(keyspace);
+ return cfs != null && cfs.contains(cfName);
+ }
+
+ private static Cassandra.Client createThriftClient(String host, int
port) throws TTransportException
+ {
+ TSocket socket = new TSocket(host, port);
+ TTransport trans = new TFramedTransport(socket);
+ trans.open();
+ TProtocol protocol = new TBinaryProtocol(trans);
+ return new Cassandra.Client(protocol);
+ }
+ }
+
+ static class LoaderOptions implements SSTableLoader.OutputHandler
+ {
+ public final File directory;
+
+ public boolean debug;
+ public boolean verbose;
+ public boolean noProgress;
+
+ public Set<InetAddress> ignores = new HashSet<InetAddress>();
+
+ LoaderOptions(File directory)
+ {
+ this.directory = directory;
+ }
+
+ public static LoaderOptions parseArgs(String cmdArgs[])
+ {
+ CommandLineParser parser = new GnuParser();
+ CmdLineOptions options = getCmdLineOptions();
+ try
+ {
+ CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+ if (cmd.hasOption(HELP_OPTION))
+ {
+ printUsage(options);
+ System.exit(0);
+ }
+
+ String[] args = cmd.getArgs();
+ if (args.length == 0)
+ {
+ System.err.println("Missing sstable directory argument");
+ printUsage(options);
+ System.exit(1);
+ }
+
+ if (args.length > 1)
+ {
+ System.err.println("Too many arguments");
+ printUsage(options);
+ System.exit(1);
+ }
+
+ String dirname = args[0];
+ File dir = new File(dirname);
+
+ if (!dir.exists())
+ errorMsg("Unknown directory: " + dirname, options);
+
+ if (!dir.isDirectory())
+ errorMsg(dirname + " is not a directory", options);
+
+ LoaderOptions opts = new LoaderOptions(dir);
+
+ opts.debug = cmd.hasOption(DEBUG_OPTION);
+ opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+ opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
+
+ if (cmd.hasOption(IGNORE_NODES_OPTION))
+ {
+ String[] nodes =
cmd.getOptionValue(IGNORE_NODES_OPTION).split(",");
+ try
+ {
+ for (String node : nodes)
+ {
+ opts.ignores.add(InetAddress.getByName(node));
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ errorMsg(e.getMessage(), options);
+ }
+ }
+
+ return opts;
+ }
+ catch (ParseException e)
+ {
+ errorMsg(e.getMessage(), options);
+ return null;
+ }
+ }
+
+ private static void errorMsg(String msg, CmdLineOptions options)
+ {
+ System.err.println(msg);
+ printUsage(options);
+ System.exit(1);
+ }
+
+ public void output(String msg)
+ {
+ System.out.println(msg);
+ }
+
+ public void debug(String msg)
+ {
+ if (verbose)
+ System.out.println(msg);
+ }
+
+ private static CmdLineOptions getCmdLineOptions()
+ {
+ CmdLineOptions options = new CmdLineOptions();
+ options.addOption(null, DEBUG_OPTION, "display stack
traces");
+ options.addOption("v", VERBOSE_OPTION, "verbose output");
+ options.addOption("h", HELP_OPTION, "display this help
message");
+ options.addOption(null, NOPROGRESS_OPTION, "don't display
progress");
+ options.addOption("i", IGNORE_NODES_OPTION, "don't stream to this
(comma separated) list of nodes");
+ return options;
+ }
+
+ public static void printUsage(Options options)
+ {
+ String usage = String.format("%s [options] <dir_path>", TOOL_NAME);
+ StringBuilder header = new StringBuilder();
+ header.append("--\n");
+ header.append("Bulk load the sstables find in the directory
<dir_path> to the configured cluster." );
+ header.append("The last directory of <dir_path> is used as the
keyspace name. ");
+ header.append("So for instance, to load a sstable named
Standard1-g-1-Data.db into keyspace Keyspace1, ");
+ header.append("you will need to have the files
Standard1-g-1-Data.db and Standard1-g-1-Index.db in a ");
+ header.append("directory Keyspace1/ in the current directory and
call: sstableloader Keyspace1");
+ header.append("\n--\n");
+ header.append("Options are:");
+ new HelpFormatter().printHelp(usage, header.toString(), options,
"");
+ }
+ }
+
+ private static class CmdLineOptions extends Options
+ {
+ /**
+ * Add option with argument and argument name
+ * @param opt shortcut for option name
+ * @param longOpt complete option name
+ * @param argName argument name
+ * @param description description of the option
+ * @return updated Options object
+ */
+ public Options addOption(String opt, String longOpt, String argName,
String description)
+ {
+ Option option = new Option(opt, longOpt, true, description);
+ option.setArgName(argName);
+
+ return addOption(option);
+ }
+
+ /**
+ * Add option without argument
+ * @param opt shortcut for option name
+ * @param longOpt complete option name
+ * @param description description of the option
+ * @return updated Options object
+ */
+ public Options addOption(String opt, String longOpt, String
description)
+ {
+ return addOption(new Option(opt, longOpt, false, description));
+ }
+ }
+}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java?rev=1126477&r1=1126476&r2=1126477&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/BloomFilter.java
Mon May 23 12:51:03 2011
@@ -41,6 +41,11 @@ public class BloomFilter extends Filter
bitset = bs;
}
+ public static BloomFilter emptyFilter()
+ {
+ return new BloomFilter(0, bucketsFor(0, 0));
+ }
+
public static ICompactSerializer2<BloomFilter> serializer()
{
return serializer_;