BulkOutputFormat infers CF/column types from first mutation. Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-3828
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35aad40b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35aad40b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35aad40b Branch: refs/heads/trunk Commit: 35aad40b05442b4c827cf8d373c65954d4b36748 Parents: cba4087 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Thu Feb 2 18:00:08 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Thu Feb 2 18:00:08 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 84 +++++++++++---- 1 files changed, 65 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/35aad40b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index c8a3a4f..573ec8f 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -29,6 +29,7 @@ import java.net.UnknownHostException; import java.util.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.dht.Range; @@ -52,11 +53,25 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; - private final static String IS_SUPERCF = "mapreduce.output.bulkoutputformat.issuper"; private final Configuration conf; - private boolean isSuper = false; private SSTableSimpleUnsortedWriter writer; private SSTableLoader loader; + private File outputdir; + + private enum CFType + { + NORMAL, + SUPER, + } + + private enum ColType + { + NORMAL, + COUNTER + } + + private CFType cfType; + private ColType colType; static { DatabaseDescriptor.initDefaultsOnly(); // make sure DD doesn't load yaml @@ -72,21 +87,8 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> this.conf = conf; DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0"))); String keyspace = ConfigHelper.getOutputKeyspace(conf); - File outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader + outputdir = new File(getOutputLocation() + File.separator + keyspace); //dir must be named by ks for the loader outputdir.mkdirs(); - this.isSuper = Boolean.valueOf(conf.get(IS_SUPERCF)); - AbstractType<?> subcomparator = null; - if (isSuper) - subcomparator = BytesType.instance; - this.writer = new SSTableSimpleUnsortedWriter( - outputdir, - keyspace, - ConfigHelper.getOutputColumnFamily(conf), - BytesType.instance, - subcomparator, - Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64"))); - - this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler()); } private String getOutputLocation() throws IOException @@ -97,21 +99,65 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> return dir; } + private void setTypes(Mutation mutation) + { + if (cfType == null) + { + if (mutation.getColumn_or_supercolumn().isSetSuper_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column()) + cfType = CFType.SUPER; + else + cfType = CFType.NORMAL; + if (mutation.getColumn_or_supercolumn().isSetCounter_column() || mutation.getColumn_or_supercolumn().isSetCounter_super_column()) + colType = ColType.COUNTER; + else + colType = ColType.NORMAL; + } + } + + private void prepareWriter() throws IOException + { + if (writer == null) + { + AbstractType<?> subcomparator = null; + if (cfType == CFType.SUPER) + subcomparator = BytesType.instance; + this.writer = new SSTableSimpleUnsortedWriter( + outputdir, + ConfigHelper.getOutputKeyspace(conf), + ConfigHelper.getOutputColumnFamily(conf), + BytesType.instance, + subcomparator, + Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64"))); + this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler()); + } + } @Override public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException { + setTypes(value.get(0)); + prepareWriter(); writer.newRow(keybuff); for (Mutation mut : value) { - if (isSuper) + if (cfType == CFType.SUPER) { writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) - writer.addColumn(column.name, column.value, column.timestamp); + { + if (colType == ColType.COUNTER) + writer.addCounterColumn(column.name, column.value.getLong()); + else + writer.addColumn(column.name, column.value, column.timestamp); + } } else - writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + { + if (colType == ColType.COUNTER) + writer.addCounterColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value.getLong()); + else + writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + } } }