Dear Wiki user, You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.
The "conf/README.txt" page has been changed by DaveBrosius: http://wiki.apache.org/cassandra/conf/README.txt?action=diff&rev1=9&rev2=10 + conf/README.txt - ## page was renamed from Roger Mbiama - Describe Roger Mbiama here. - - Roger Mbiama/README.txt Required configuration files ============================ @@ -34, +31 @@ # See the License for the specific language governing permissions and # limitations under the License. + calculate_heap_sizes() + { + case "`uname`" in + Linux) + system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'` + system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo` + break + ;; + FreeBSD) + system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` + system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` + system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` + break + ;; + SunOS) + system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'` + system_cpu_cores=`psrinfo | wc -l` + break + ;; + *) + # assume reasonable defaults for e.g. a modern desktop or + # cheap server + system_memory_in_mb="2048" + system_cpu_cores="2" + ;; + esac + max_heap_size_in_mb=`expr $system_memory_in_mb / 2` + MAX_HEAP_SIZE="${max_heap_size_in_mb}M" + + # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) + max_sensible_yg_per_core_in_mb="100" + max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores` + + desired_yg_in_mb=`expr $max_heap_size_in_mb / 4` + + if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] + then + HEAP_NEWSIZE="${max_sensible_yg_in_mb}M" + else + HEAP_NEWSIZE="${desired_yg_in_mb}M" + fi + } + + # Override these to set the amount of memory to allocate to the JVM at + # start-up. For production use you almost certainly want to adjust + # this for your environment. MAX_HEAP_SIZE is the total amount of + # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size + # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should + # be either set or not (if you set one, set the other). + # + # The main trade-off for the young generation is that the larger it + # is, the longer GC pause times will be. The shorter it is, the more + # expensive GC will be (usually). + # + # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause + # times. If in doubt, and if you do not particularly want to tweak, go with + # 100 MB per physical CPU core. + + #MAX_HEAP_SIZE="4G" + #HEAP_NEWSIZE="800M" + + if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then + calculate_heap_sizes + else + if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then + echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" + exit 1 + fi + fi + + # Specifies the default port over which Cassandra will be available for + # JMX connections. + JMX_PORT="7199" + + + # Here we create the arguments that will get passed to the jvm when + # starting cassandra. + + # enable assertions. disabling this in production will give a modest + # performance benefit (around 5%). + JVM_OPTS="$JVM_OPTS -ea" + + # add the jamm javaagent + check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'` + if [ "$check_openjdk" != "OpenJDK" ] + then + JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar" + fi + + # enable thread priorities, primarily so we can give periodic tasks + # a lower priority to avoid interfering with client workload + JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" + # allows lowering thread priority without being root. see + # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html + JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42" + + # min and max heap sizes should be set to the same value to avoid + # stop-the-world GC pauses during resize, and so that we can lock the + # heap in memory on startup to prevent any of it from being swapped + # out. + JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" + JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" + JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" + JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" + + if [ "`uname`" = "Linux" ] ; then + # reduce the per-thread stack size to minimize the impact of Thrift + # thread-per-client. (Best practice is for client connections to + # be pooled anyway.) Only do so on Linux where it is known to be + # supported. + JVM_OPTS="$JVM_OPTS -Xss128k" + fi + + # GC tuning options + JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" + JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" + JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" + JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" + JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" + JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" + JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly" + + # GC logging options -- uncomment to enable + # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" + # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" + # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" + # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" + # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" + # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log" + + # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 + # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414" + + # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See + # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: + # comment out this entry to enable IPv6 support). + JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" + + # jmx: metrics and administration interface + # + # add this if you're having trouble connecting: + # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>" + # + # see + # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole + # for more on configuring JMX through firewalls, etc. (Short version: + # get it working with no firewall first.) + JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" + JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" + JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" + + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /** + * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is to + * mass import large amounts of data, without using the Thrift interface. + * + * Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack + * from Thrift on every insert. The example below utilizes Hadoop to generate all the data necessary + * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop ends up doing is + * creating the actual data that gets put into an SSTable as if you were using Thrift. With enough Hadoop nodes + * inserting the data, the bottleneck at this point should become the network. + * + * We recommend adjusting the compaction threshold to 0, while the import is running. After the import, you need + * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as this will flush the remaining data still left + * in memory to disk. Then it's recommended to adjust the compaction threshold to it's original value. + * + * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked to work with normal Columns. + * + * You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key> + * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the + * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. + * + * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes for each row + * before proceeding to the next. This means to keep Cassandra similarly busy you can either + * 1) add more reducer tasks, + * 2) remove the "wait for acks" block of code, + * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor. + * + * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE. + */ + + package org.apache.cassandra.bulkloader; + + import java.io.IOException; + import java.io.UnsupportedEncodingException; + import java.math.BigInteger; + import java.net.URI; + import java.net.URISyntaxException; + import java.util.ArrayList; + import java.util.Iterator; + import java.util.LinkedList; + import java.util.List; + + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.clock.TimestampReconciler; + import org.apache.cassandra.db.*; + import org.apache.cassandra.db.filter.QueryPath; + import org.apache.cassandra.dht.BigIntegerToken; + import org.apache.cassandra.io.util.DataOutputBuffer; + import java.net.InetAddress; + import java.net.UnknownHostException; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.TimeoutException; + + import org.apache.cassandra.net.IAsyncResult; + import org.apache.cassandra.net.Message; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.hadoop.filecache.DistributedCache; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapred.*; + + public class CassandraBulkLoader { + public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> { + private Text word = new Text(); + + public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { + // This is a simple key/value mapper. + output.collect(key, value); + } + } + + public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { + private Path[] localFiles; + private ArrayList<String> tokens = new ArrayList<String>(); + private JobConf jobconf; + + public void configure(JobConf job) { + this.jobconf = job; + String cassConfig; + + // Get the cached files + try + { + localFiles = DistributedCache.getLocalCacheFiles(job); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + cassConfig = localFiles[0].getParent().toString(); + + System.setProperty("storage-config",cassConfig); + + try + { + StorageService.instance.initClient(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + try + { + Thread.sleep(10*1000); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + public void close() + { + try + { + // release the cache + DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (URISyntaxException e) + { + throw new RuntimeException(e); + } + try + { + // Sleep just in case the number of keys we send over is small + Thread.sleep(3*1000); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + StorageService.instance.stopClient(); + } + + public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException + { + ColumnFamily columnFamily; + String keyspace = "Keyspace1"; + String cfName = "Super1"; + Message message; + List<ColumnFamily> columnFamilies; + columnFamilies = new LinkedList<ColumnFamily>(); + String line; + + /* Create a column family */ + columnFamily = ColumnFamily.create(keyspace, cfName); + while (values.hasNext()) { + // Split the value (line based on your own delimiter) + line = values.next().toString(); + String[] fields = line.split("\1"); + String SuperColumnName = fields[1]; + String ColumnName = fields[2]; + String ColumnValue = fields[3]; + int timestamp = 0; + columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp)); + } + + columnFamilies.add(columnFamily); + + /* Get serialized message to send to cluster */ + message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies); + List<IAsyncResult> results = new ArrayList<IAsyncResult>(); + for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes())) + { + /* Send message to end point */ + results.add(MessagingService.instance.sendRR(message, endpoint)); + } + /* wait for acks */ + for (IAsyncResult result : results) + { + try + { + result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) + { + // you should probably add retry logic here + throw new RuntimeException(e); + } + } + + output.collect(key, new Text(" inserted into Cassandra node(s)")); + } + } + + public static void runJob(String[] args) + { + JobConf conf = new JobConf(CassandraBulkLoader.class); + + if(args.length >= 4) + { + conf.setNumReduceTasks(new Integer(args[3])); + } + + try + { + // We store the cassandra storage-conf.xml on the HDFS cluster + DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf); + } + catch (URISyntaxException e) + { + throw new RuntimeException(e); + } + conf.setInputFormat(KeyValueTextInputFormat.class); + conf.setJobName("CassandraBulkLoader_v2"); + conf.setMapperClass(Map.class); + conf.setReducerClass(Reduce.class); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(Text.class); + + FileInputFormat.setInputPaths(conf, new Path(args[1])); + FileOutputFormat.setOutputPath(conf, new Path(args[2])); + try + { + JobClient.runJob(conf); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily> ColumnFamiles) + { + ColumnFamily baseColumnFamily; + DataOutputBuffer bufOut = new DataOutputBuffer(); + RowMutation rm; + Message message; + Column column; + + /* Get the first column family from list, this is just to get past validation */ + baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard, + ClockType.Timestamp, + DatabaseDescriptor.getComparator(Keyspace, CFName), + DatabaseDescriptor.getSubComparator(Keyspace, CFName), + TimestampReconciler.instance, + CFMetaData.getId(Keyspace, CFName)); + + for(ColumnFamily cf : ColumnFamiles) { + bufOut.reset(); + ColumnFamily.serializer().serializeWithIndexes(cf, bufOut); + byte[] data = new byte[bufOut.getLength()]; + System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength()); + + column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0)); + baseColumnFamily.addColumn(column); + } + rm = new RowMutation(Keyspace, Key); + rm.add(baseColumnFamily); + + try + { + /* Make message */ + message = rm.makeRowMutationMessage(StorageService.Verb.BINARY); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + return message; + } + public static void main(String[] args) throws Exception + { + runJob(args); + } + } + + #!/bin/bash + # + # /etc/init.d/cassandra + # + # Startup script for Cassandra + # + # chkconfig: 2345 20 80 + # description: Starts and stops Cassandra + + . /etc/rc.d/init.d/functions + + export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/ + export CASSANDRA_HOME=/usr/share/cassandra/ + export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh + export CASSANDRA_CONF=/etc/cassandra/conf + export CASSANDRA_OWNR=cassandra + log_file=/var/log/cassandra/cassandra.log + pid_file=/var/run/cassandra/cassandra.pid + CASSANDRA_PROG=/usr/sbin/cassandra + + + case "$1" in + start) + # Cassandra startup + echo -n "Starting Cassandra: " + su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1 + echo "OK" + ;; + stop) + # Cassandra shutdown + echo -n "Shutdown Cassandra: " + su $CASSANDRA_OWNR -c "kill `cat $pid_file`" + echo "OK" + ;; + reload|restart) + $0 stop + $0 start + ;; + status) + ;; + *) + echo "Usage: `basename $0` start|stop|restart|reload" + exit 1 + esac + + exit 0 +
