Dear Wiki user, You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.
The "Roger Mbiama" page has been changed by Roger Mbiama: http://wiki.apache.org/cassandra/Roger%20Mbiama?action=diff&rev1=1&rev2=2 Describe Roger Mbiama here. - conf/README.txt + Roger Mbiama/README.txt Required configuration files ============================ @@ -33, +33 @@ # See the License for the specific language governing permissions and # limitations under the License. - calculate_heap_sizes() - { - case "`uname`" in - Linux) - system_memory_in_mb=`free -m | awk '/Mem:/ {print $2}'` - system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo` - break - ;; - FreeBSD) - system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` - system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` - system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` - break - ;; - SunOS) - system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'` - system_cpu_cores=`psrinfo | wc -l` - break - ;; - *) - # assume reasonable defaults for e.g. a modern desktop or - # cheap server - system_memory_in_mb="2048" - system_cpu_cores="2" - ;; - esac - max_heap_size_in_mb=`expr $system_memory_in_mb / 2` - MAX_HEAP_SIZE="${max_heap_size_in_mb}M" - - # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) - max_sensible_yg_per_core_in_mb="100" - max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores` - - desired_yg_in_mb=`expr $max_heap_size_in_mb / 4` - - if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] - then - HEAP_NEWSIZE="${max_sensible_yg_in_mb}M" - else - HEAP_NEWSIZE="${desired_yg_in_mb}M" - fi - } - - # Override these to set the amount of memory to allocate to the JVM at - # start-up. For production use you almost certainly want to adjust - # this for your environment. MAX_HEAP_SIZE is the total amount of - # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size - # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should - # be either set or not (if you set one, set the other). - # - # The main trade-off for the young generation is that the larger it - # is, the longer GC pause times will be. The shorter it is, the more - # expensive GC will be (usually). - # - # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause - # times. If in doubt, and if you do not particularly want to tweak, go with - # 100 MB per physical CPU core. - - #MAX_HEAP_SIZE="4G" - #HEAP_NEWSIZE="800M" - - if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then - calculate_heap_sizes - else - if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then - echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" - exit 1 - fi - fi - - # Specifies the default port over which Cassandra will be available for - # JMX connections. - JMX_PORT="7199" - - - # Here we create the arguments that will get passed to the jvm when - # starting cassandra. - - # enable assertions. disabling this in production will give a modest - # performance benefit (around 5%). - JVM_OPTS="$JVM_OPTS -ea" - - # add the jamm javaagent - check_openjdk=`"${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}'` - if [ "$check_openjdk" != "OpenJDK" ] - then - JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar" - fi - - # enable thread priorities, primarily so we can give periodic tasks - # a lower priority to avoid interfering with client workload - JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" - # allows lowering thread priority without being root. see - # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html - JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42" - - # min and max heap sizes should be set to the same value to avoid - # stop-the-world GC pauses during resize, and so that we can lock the - # heap in memory on startup to prevent any of it from being swapped - # out. - JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" - JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" - JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" - JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError" - - if [ "`uname`" = "Linux" ] ; then - # reduce the per-thread stack size to minimize the impact of Thrift - # thread-per-client. (Best practice is for client connections to - # be pooled anyway.) Only do so on Linux where it is known to be - # supported. - JVM_OPTS="$JVM_OPTS -Xss128k" - fi - - # GC tuning options - JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" - JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" - JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" - JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" - JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" - JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" - JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly" - - # GC logging options -- uncomment to enable - # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" - # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" - # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" - # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" - # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" - # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log" - - # uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 - # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414" - - # Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See - # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: - # comment out this entry to enable IPv6 support). - JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true" - - # jmx: metrics and administration interface - # - # add this if you're having trouble connecting: - # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>" - # - # see - # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole - # for more on configuring JMX through firewalls, etc. (Short version: - # get it working with no firewall first.) - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" - - - 2 - 3 - 4 - 5 - 6 - 7 - 8 - 9 - 10 - 11 - 12 - 13 - 14 - 15 - 16 - 17 - 18 - 19 - 20 - 21 - 22 - 23 - 24 - 25 - 26 - 27 - 28 - 29 - 30 - 31 - 32 - 33 - 34 - 35 - 36 - 37 - 38 - 39 - 40 - 41 - 42 - 43 - 44 - 45 - 46 - 47 - 48 - 49 - 50 - 51 - 52 - 53 - 54 - 55 - 56 - 57 - 58 - 59 - 60 - 61 - 62 - 63 - 64 - 65 - 66 - 67 - 68 - 69 - 70 - 71 - 72 - 73 - 74 - 75 - 76 - 77 - 78 - 79 - 80 - 81 - 82 - 83 - 84 - 85 - 86 - 87 - 88 - 89 - 90 - 91 - 92 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 - 101 - 102 - 103 - 104 - 105 - 106 - 107 - 108 - 109 - 110 - 111 - 112 - 113 - 114 - 115 - 116 - 117 - 118 - 119 - 120 - 121 - 122 - 123 - 124 - 125 - 126 - 127 - 128 - 129 - 130 - 131 - 132 - 133 - 134 - 135 - 136 - 137 - 138 - 139 - 140 - 141 - 142 - 143 - 144 - 145 - 146 - 147 - 148 - 149 - 150 - 151 - 152 - 153 - 154 - 155 - 156 - 157 - 158 - 159 - 160 - 161 - 162 - 163 - 164 - 165 - 166 - 167 - 168 - 169 - 170 - 171 - 172 - 173 - 174 - 175 - 176 - 177 - 178 - 179 - 180 - 181 - 182 - 183 - 184 - 185 - 186 - 187 - 188 - 189 - 190 - 191 - 192 - 193 - 194 - 195 - 196 - 197 - 198 - 199 - 200 - 201 - 202 - 203 - 204 - 205 - 206 - 207 - 208 - 209 - 210 - 211 - 212 - 213 - 214 - 215 - 216 - 217 - 218 - 219 - 220 - 221 - 222 - 223 - 224 - 225 - 226 - 227 - 228 - 229 - 230 - 231 - 232 - 233 - 234 - 235 - 236 - 237 - 238 - 239 - 240 - 241 - 242 - 243 - 244 - 245 - 246 - 247 - 248 - 249 - 250 - 251 - 252 - 253 - 254 - 255 - 256 - 257 - 258 - 259 - 260 - 261 - 262 - 263 - 264 - 265 - 266 - 267 - 268 - 269 - 270 - 271 - 272 - 273 - 274 - 275 - 276 - 277 - 278 - 279 - 280 - 281 - 282 - 283 - 284 - 285 - 286 - 287 - 288 - 289 - 290 - 291 - 292 - 293 - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - /** - * Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is to - * mass import large amounts of data, without using the Thrift interface. - * - * Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack - * from Thrift on every insert. The example below utilizes Hadoop to generate all the data necessary - * to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop ends up doing is - * creating the actual data that gets put into an SSTable as if you were using Thrift. With enough Hadoop nodes - * inserting the data, the bottleneck at this point should become the network. - * - * We recommend adjusting the compaction threshold to 0, while the import is running. After the import, you need - * to run `nodeprobe -host <IP> flush_binary <Keyspace>` on every node, as this will flush the remaining data still left - * in memory to disk. Then it's recommended to adjust the compaction threshold to it's original value. - * - * The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked to work with normal Columns. - * - * You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key> - * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the - * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. - * - * For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes for each row - * before proceeding to the next. This means to keep Cassandra similarly busy you can either - * 1) add more reducer tasks, - * 2) remove the "wait for acks" block of code, - * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor. - * - * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE. - */ - - package org.apache.cassandra.bulkloader; - - import java.io.IOException; - import java.io.UnsupportedEncodingException; - import java.math.BigInteger; - import java.net.URI; - import java.net.URISyntaxException; - import java.util.ArrayList; - import java.util.Iterator; - import java.util.LinkedList; - import java.util.List; - - import org.apache.cassandra.config.CFMetaData; - import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.db.clock.TimestampReconciler; - import org.apache.cassandra.db.*; - import org.apache.cassandra.db.filter.QueryPath; - import org.apache.cassandra.dht.BigIntegerToken; - import org.apache.cassandra.io.util.DataOutputBuffer; - import java.net.InetAddress; - import java.net.UnknownHostException; - import java.util.concurrent.TimeUnit; - import java.util.concurrent.TimeoutException; - - import org.apache.cassandra.net.IAsyncResult; - import org.apache.cassandra.net.Message; - import org.apache.cassandra.net.MessagingService; - import org.apache.cassandra.service.StorageService; - import org.apache.cassandra.utils.FBUtilities; - import org.apache.hadoop.filecache.DistributedCache; - import org.apache.hadoop.fs.Path; - import org.apache.hadoop.io.Text; - import org.apache.hadoop.mapred.*; - - public class CassandraBulkLoader { - public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> { - private Text word = new Text(); - - public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { - // This is a simple key/value mapper. - output.collect(key, value); - } - } - - public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { - private Path[] localFiles; - private ArrayList<String> tokens = new ArrayList<String>(); - private JobConf jobconf; - - public void configure(JobConf job) { - this.jobconf = job; - String cassConfig; - - // Get the cached files - try - { - localFiles = DistributedCache.getLocalCacheFiles(job); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - cassConfig = localFiles[0].getParent().toString(); - - System.setProperty("storage-config",cassConfig); - - try - { - StorageService.instance.initClient(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - try - { - Thread.sleep(10*1000); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - - public void close() - { - try - { - // release the cache - DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - catch (URISyntaxException e) - { - throw new RuntimeException(e); - } - try - { - // Sleep just in case the number of keys we send over is small - Thread.sleep(3*1000); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - StorageService.instance.stopClient(); - } - - public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException - { - ColumnFamily columnFamily; - String keyspace = "Keyspace1"; - String cfName = "Super1"; - Message message; - List<ColumnFamily> columnFamilies; - columnFamilies = new LinkedList<ColumnFamily>(); - String line; - - /* Create a column family */ - columnFamily = ColumnFamily.create(keyspace, cfName); - while (values.hasNext()) { - // Split the value (line based on your own delimiter) - line = values.next().toString(); - String[] fields = line.split("\1"); - String SuperColumnName = fields[1]; - String ColumnName = fields[2]; - String ColumnValue = fields[3]; - int timestamp = 0; - columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp)); - } - - columnFamilies.add(columnFamily); - - /* Get serialized message to send to cluster */ - message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies); - List<IAsyncResult> results = new ArrayList<IAsyncResult>(); - for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes())) - { - /* Send message to end point */ - results.add(MessagingService.instance.sendRR(message, endpoint)); - } - /* wait for acks */ - for (IAsyncResult result : results) - { - try - { - result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - } - catch (TimeoutException e) - { - // you should probably add retry logic here - throw new RuntimeException(e); - } - } - - output.collect(key, new Text(" inserted into Cassandra node(s)")); - } - } - - public static void runJob(String[] args) - { - JobConf conf = new JobConf(CassandraBulkLoader.class); - - if(args.length >= 4) - { - conf.setNumReduceTasks(new Integer(args[3])); - } - - try - { - // We store the cassandra storage-conf.xml on the HDFS cluster - DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf); - } - catch (URISyntaxException e) - { - throw new RuntimeException(e); - } - conf.setInputFormat(KeyValueTextInputFormat.class); - conf.setJobName("CassandraBulkLoader_v2"); - conf.setMapperClass(Map.class); - conf.setReducerClass(Reduce.class); - - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(Text.class); - - FileInputFormat.setInputPaths(conf, new Path(args[1])); - FileOutputFormat.setOutputPath(conf, new Path(args[2])); - try - { - JobClient.runJob(conf); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily> ColumnFamiles) - { - ColumnFamily baseColumnFamily; - DataOutputBuffer bufOut = new DataOutputBuffer(); - RowMutation rm; - Message message; - Column column; - - /* Get the first column family from list, this is just to get past validation */ - baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard, - ClockType.Timestamp, - DatabaseDescriptor.getComparator(Keyspace, CFName), - DatabaseDescriptor.getSubComparator(Keyspace, CFName), - TimestampReconciler.instance, - CFMetaData.getId(Keyspace, CFName)); - - for(ColumnFamily cf : ColumnFamiles) { - bufOut.reset(); - ColumnFamily.serializer().serializeWithIndexes(cf, bufOut); - byte[] data = new byte[bufOut.getLength()]; - System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength()); - - column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0)); - baseColumnFamily.addColumn(column); - } - rm = new RowMutation(Keyspace, Key); - rm.add(baseColumnFamily); - - try - { - /* Make message */ - message = rm.makeRowMutationMessage(StorageService.Verb.BINARY); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - - return message; - } - public static void main(String[] args) throws Exception - { - runJob(args); - } - } - - #!/bin/bash - # - # /etc/init.d/cassandra - # - # Startup script for Cassandra - # - # chkconfig: 2345 20 80 - # description: Starts and stops Cassandra - - . /etc/rc.d/init.d/functions - - export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/ - export CASSANDRA_HOME=/usr/share/cassandra/ - export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh - export CASSANDRA_CONF=/etc/cassandra/conf - export CASSANDRA_OWNR=cassandra - log_file=/var/log/cassandra/cassandra.log - pid_file=/var/run/cassandra/cassandra.pid - CASSANDRA_PROG=/usr/sbin/cassandra - - - case "$1" in - start) - # Cassandra startup - echo -n "Starting Cassandra: " - su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1 - echo "OK" - ;; - stop) - # Cassandra shutdown - echo -n "Shutdown Cassandra: " - su $CASSANDRA_OWNR -c "kill `cat $pid_file`" - echo "OK" - ;; - reload|restart) - $0 stop - $0 start - ;; - status) - ;; - *) - echo "Usage: `basename $0` start|stop|restart|reload" - exit 1 - esac - - exit 0 -
