Updated Branches: refs/heads/cassandra-1.1 ea3b8da8f -> a7f1e7a62
Allow overriding default input/outputformats Patch by brandonwilliams, reviewed by xedin for CASSANDRA-3826 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7f1e7a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7f1e7a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7f1e7a6 Branch: refs/heads/cassandra-1.1 Commit: a7f1e7a62d882a97fe15cbd8eee94e01333cddef Parents: 0d768e1 Author: Brandon Williams <[email protected]> Authored: Thu Feb 2 11:44:03 2012 -0600 Committer: Brandon Williams <[email protected]> Committed: Thu Feb 2 13:06:12 2012 -0600 ---------------------------------------------------------------------- .../cassandra/hadoop/pig/CassandraStorage.java | 42 +++++++++++++-- 1 files changed, 38 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7f1e7a6/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index eec516f..dc6ec90 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.IntegerType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,7 +72,12 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; + public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; + private final static String DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyInputFormat"; + private final static String DEFAULT_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.ColumnFamilyOutputFormat"; + private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -86,6 +92,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private Configuration conf; private RecordReader reader; private RecordWriter writer; + private String inputFormatClass; + private String outputFormatClass; private int limit; public CassandraStorage() @@ -247,7 +255,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo @Override public InputFormat getInputFormat() { - return new ColumnFamilyInputFormat(); + try + { + return FBUtilities.construct(inputFormatClass, "inputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } } @Override @@ -329,12 +344,24 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); - } + } if(System.getenv(PIG_INPUT_PARTITIONER) != null) ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); - + if (System.getenv(PIG_INPUT_FORMAT) != null) + inputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_INPUT_FORMAT)); + else + inputFormatClass = DEFAULT_INPUT_FORMAT; + if (System.getenv(PIG_OUTPUT_FORMAT) != null) + outputFormatClass = getFullyQualifiedClassName(System.getenv(PIG_OUTPUT_FORMAT)); + else + outputFormatClass = DEFAULT_OUTPUT_FORMAT; + } + + private String getFullyQualifiedClassName(String classname) + { + return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname; } @Override @@ -505,7 +532,14 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo public OutputFormat getOutputFormat() { - return new ColumnFamilyOutputFormat(); + try + { + return FBUtilities.construct(outputFormatClass, "outputformat"); + } + catch (ConfigurationException e) + { + throw new RuntimeException(e); + } } public void checkSchema(ResourceSchema schema) throws IOException
