Updated Branches: refs/heads/trunk 937f15e17 -> e05a5fc12
Add support for writing to multiple column families in CFOF Patch by Robbie Strickland; Reviewed by tjake for CASSANDRA-4208 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e05a5fc1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e05a5fc1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e05a5fc1 Branch: refs/heads/trunk Commit: e05a5fc12648f315002c9939a2a0748d74525589 Parents: 937f15e Author: Jake Luciani <[email protected]> Authored: Mon Oct 1 20:56:16 2012 +0200 Committer: Jake Luciani <[email protected]> Committed: Mon Oct 1 20:59:32 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 2 +- .../apache/cassandra/hadoop/BulkOutputFormat.java | 6 +- .../cassandra/hadoop/ColumnFamilyOutputFormat.java | 6 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 49 ++++++++++---- 5 files changed, 41 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 89ee88d..d3932b7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,7 @@ (CASSANDRA-4706) * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648) * Add support for batchlog in CQL3 (CASSANDRA-4545) + * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) 1.2-beta1 * add atomic_batch_mutate (CASSANDRA-4542, -4635) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 7c288cc..749029e 100644 --- a/build.xml +++ b/build.xml @@ -362,7 +362,7 @@ <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.6"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> - <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="0.20.203.0"/> + <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.2"/> <dependency groupId="org.apache.pig" artifactId="pig" version="0.9.2"/> <dependency groupId="net.sf.jopt-simple" artifactId="jopt-simple" version="3.2"/> <dependency groupId="net.java.dev.jna" artifactId="jna" version="3.2.7"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index 7a6a1d7..f4ad9d9 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -37,10 +37,8 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> private void checkOutputSpecs(Configuration conf) { - if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()"); - } + if (ConfigHelper.getOutputKeyspace(conf) == null) + throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index b872082..64e080b 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -82,10 +82,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat private void checkOutputSpecs(Configuration conf) { - if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()"); - } + if (ConfigHelper.getOutputKeyspace(conf) == null) + throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); if (ConfigHelper.getOutputPartitioner(conf) == null) throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); if (ConfigHelper.getOutputInitialAddress(conf) == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e05a5fc1/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 6282143..a2a461b 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -58,7 +58,7 @@ public class ConfigHelper private static final String OUTPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.output.keyspace.username"; private static final String OUTPUT_KEYSPACE_PASSWD_CONFIG = "cassandra.output.keyspace.passwd"; private static final String INPUT_COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily"; - private static final String OUTPUT_COLUMNFAMILY_CONFIG = "cassandra.output.columnfamily"; + private static final String OUTPUT_COLUMNFAMILY_CONFIG = "mapreduce.output.basename"; //this must == OutputFormat.BASE_OUTPUT_NAME private static final String INPUT_PREDICATE_CONFIG = "cassandra.input.predicate"; private static final String INPUT_KEYRANGE_CONFIG = "cassandra.input.keyRange"; private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size"; @@ -117,25 +117,43 @@ public class ConfigHelper } /** - * Set the keyspace and column family for the output of this job. + * Set the keyspace for the output of this job. * * @param conf Job configuration you are about to run * @param keyspace - * @param columnFamily */ - public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily) + public static void setOutputKeyspace(Configuration conf, String keyspace) { if (keyspace == null) { throw new UnsupportedOperationException("keyspace may not be null"); } - if (columnFamily == null) - { - throw new UnsupportedOperationException("columnfamily may not be null"); - } conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace); - conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily); + } + + /** + * Set the column family for the input of this job. + * + * @param conf Job configuration you are about to run + * @param columnFamily + */ + public static void setOutputColumnFamily(Configuration conf, String columnFamily) + { + conf.set(OUTPUT_COLUMNFAMILY_CONFIG, columnFamily); + } + + /** + * Set the column family for the input of this job. + * + * @param conf Job configuration you are about to run + * @param keyspace + * @param columnFamily + */ + public static void setOutputColumnFamily(Configuration conf, String keyspace, String columnFamily) + { + setOutputKeyspace(conf, keyspace); + setOutputColumnFamily(conf, columnFamily); } /** @@ -329,15 +347,18 @@ public class ConfigHelper { return conf.get(INPUT_COLUMNFAMILY_CONFIG); } - - public static boolean getInputIsWide(Configuration conf) + + public static String getOutputColumnFamily(Configuration conf) { - return Boolean.parseBoolean(conf.get(INPUT_WIDEROWS_CONFIG)); + if (conf.get(OUTPUT_COLUMNFAMILY_CONFIG) != null) + return conf.get(OUTPUT_COLUMNFAMILY_CONFIG); + else + throw new UnsupportedOperationException("You must set the output column family using either setOutputColumnFamily or by adding a named output with MultipleOutputs"); } - public static String getOutputColumnFamily(Configuration conf) + public static boolean getInputIsWide(Configuration conf) { - return conf.get(OUTPUT_COLUMNFAMILY_CONFIG); + return Boolean.valueOf(conf.get(INPUT_WIDEROWS_CONFIG)); } public static String getReadConsistencyLevel(Configuration conf)
