Revert 5417 changes to hadoop Patch by slebresne, reviewed by brandonwilliams for CASSANDRA-7241
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb1deac0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb1deac0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb1deac0 Branch: refs/heads/trunk Commit: bb1deac0b9073ca3ee590731fbbf505867a890d1 Parents: e9e91d7 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed May 28 13:21:59 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed May 28 13:22:37 2014 -0500 ---------------------------------------------------------------------- .../hadoop/ColumnFamilyInputFormat.java | 7 +- .../hadoop/ColumnFamilyRecordReader.java | 70 ++++++++++---------- .../hadoop/pig/AbstractCassandraStorage.java | 2 +- 3 files changed, 39 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index a2c7a36..686d486 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -45,15 +44,15 @@ import org.apache.hadoop.mapreduce.*; * * The default split size is 64k rows. */ -public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<CellName, Cell>> +public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>> { - public RecordReader<ByteBuffer, SortedMap<CellName, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException + public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); } - public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException + public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { TaskAttemptContext tac = HadoopCompat.newMapContext( jobConf, http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index b6b12ec..0b52904 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -30,8 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.thrift.*; @@ -45,8 +46,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; -public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<CellName, Cell>> - implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<CellName, Cell>> +public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> + implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class); @@ -54,7 +55,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private ColumnFamilySplit split; private RowIterator iter; - private Pair<ByteBuffer, SortedMap<CellName, Cell>> currentRow; + private Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> currentRow; private SlicePredicate predicate; private boolean isEmptyPredicate; private int totalRowCount; // total number of rows to fetch @@ -93,7 +94,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return currentRow.left; } - public SortedMap<CellName, Cell> getCurrentValue() + public SortedMap<ByteBuffer, Cell> getCurrentValue() { return currentRow.right; } @@ -211,12 +212,12 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return split.getLocations()[0]; } - private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>> + private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> { protected List<KeySlice> rows; protected int totalRead = 0; protected final boolean isSuper; - protected final CellNameType comparator; + protected final AbstractType<?> comparator; protected final AbstractType<?> subComparator; protected final IPartitioner partitioner; @@ -254,7 +255,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap cfDef.column_type = ByteBufferUtil.string(type); } - comparator = CellNames.fromAbstractType(TypeParser.parse(cfDef.comparator_type), true); + comparator = TypeParser.parse(cfDef.comparator_type); subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type); } catch (ConfigurationException e) @@ -298,21 +299,21 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap for (org.apache.cassandra.thrift.Column column : super_column.columns) { Cell c = unthriftifySimple(column); - cells.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer()))); + cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer())))); } return cells; } protected Cell unthriftifySimple(org.apache.cassandra.thrift.Column column) { - return new BufferCell(comparator.cellFromByteBuffer(column.name), column.value, column.timestamp); + return new BufferCell(CellNames.simpleDense(column.name), column.value, column.timestamp); } private Cell unthriftifyCounter(CounterColumn column) { //CounterColumns read the counterID from the System keyspace, so need the StorageService running and access //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular Cell. - return new BufferCell(comparator.cellFromByteBuffer(column.name), ByteBufferUtil.bytes(column.value), 0); + return new BufferCell(CellNames.simpleDense(column.name), ByteBufferUtil.bytes(column.value), 0); } private List<Cell> unthriftifySuperCounter(CounterSuperColumn super_column) @@ -321,7 +322,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap for (CounterColumn column : super_column.columns) { Cell c = unthriftifyCounter(column); - cells.add(c.withUpdatedName(comparator.makeCellName(super_column.name, c.name().toByteBuffer()))); + cells.add(c.withUpdatedName(CellNames.simpleDense(CompositeType.build(super_column.name, c.name().toByteBuffer())))); } return cells; } @@ -402,7 +403,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() { maybeInit(); if (rows == null) @@ -410,12 +411,13 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap totalRead++; KeySlice ks = rows.get(i++); - SortedMap<CellName, Cell> map = new TreeMap<CellName, Cell>(comparator); + AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; + SortedMap<ByteBuffer, Cell> map = new TreeMap<ByteBuffer, Cell>(comp); for (ColumnOrSuperColumn cosc : ks.columns) { List<Cell> cells = unthriftify(cosc); for (Cell cell : cells) - map.put(cell.name(), cell); + map.put(cell.name().toByteBuffer(), cell); } return Pair.create(ks.key, map); } @@ -423,8 +425,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap private class WideRowIterator extends RowIterator { - private PeekingIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>> wideColumns; - private Composite lastColumn = Composites.EMPTY; + private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> wideColumns; + private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER; private void maybeInit() @@ -453,7 +455,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap try { - rows = client.get_paged_slice(cfName, keyRange, lastColumn.toByteBuffer(), consistencyLevel); + rows = client.get_paged_slice(cfName, keyRange, lastColumn, consistencyLevel); int n = 0; for (KeySlice row : rows) n += row.columns.size(); @@ -472,14 +474,14 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() { maybeInit(); if (rows == null) return endOfData(); - Pair<ByteBuffer, SortedMap<CellName, Cell>> next = wideColumns.next(); - lastColumn = next.right.values().iterator().next().name(); + Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next = wideColumns.next(); + lastColumn = next.right.keySet().iterator().next().duplicate(); maybeIncreaseRowCounter(next); return next; @@ -490,7 +492,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap * Increases the row counter only if we really moved to the next row. * @param next just fetched row slice */ - private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<CellName, Cell>> next) + private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> next) { ByteBuffer currentKey = next.left; if (!currentKey.equals(lastCountedKey)) @@ -500,7 +502,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap } } - private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<CellName, Cell>>> + private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>>> { private final Iterator<KeySlice> rows; private Iterator<ColumnOrSuperColumn> columns; @@ -521,30 +523,28 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap columns = currentRow.columns.iterator(); } - protected Pair<ByteBuffer, SortedMap<CellName, Cell>> computeNext() + protected Pair<ByteBuffer, SortedMap<ByteBuffer, Cell>> computeNext() { - CellNameType cellType = subComparator == null - ? comparator - : new CompoundDenseCellNameType(Arrays.asList(comparator.asAbstractType(), subComparator)); + AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; while (true) { if (columns.hasNext()) { ColumnOrSuperColumn cosc = columns.next(); - SortedMap<CellName, Cell> map; + SortedMap<ByteBuffer, Cell> map; List<Cell> cells = unthriftify(cosc); if (cells.size() == 1) { - map = ImmutableSortedMap.of(cells.get(0).name(), cells.get(0)); + map = ImmutableSortedMap.of(cells.get(0).name().toByteBuffer(), cells.get(0)); } else { assert isSuper; - map = new TreeMap<CellName, Cell>(cellType); + map = new TreeMap<ByteBuffer, Cell>(comp); for (Cell cell : cells) - map.put(cell.name(), cell); + map.put(cell.name().toByteBuffer(), cell); } - return Pair.<ByteBuffer, SortedMap<CellName, Cell>>create(currentRow.key, map); + return Pair.<ByteBuffer, SortedMap<ByteBuffer, Cell>>create(currentRow.key, map); } if (!rows.hasNext()) @@ -561,7 +561,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap // to the old. Thus, expect a small performance hit. // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat // and ColumnFamilyRecordReader don't support them, it should be fine for now. - public boolean next(ByteBuffer key, SortedMap<CellName, Cell> value) throws IOException + public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Cell> value) throws IOException { if (this.nextKeyValue()) { @@ -582,9 +582,9 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap return ByteBuffer.wrap(new byte[this.keyBufferSize]); } - public SortedMap<CellName, Cell> createValue() + public SortedMap<ByteBuffer, Cell> createValue() { - return new TreeMap<CellName, Cell>(); + return new TreeMap<ByteBuffer, Cell>(); } public long getPos() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb1deac0/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 9737d67..361baa4 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -130,7 +130,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store if(comparator instanceof AbstractCompositeType) setTupleValue(pair, 0, composeComposite((AbstractCompositeType)comparator,colName)); else - setTupleValue(pair, 0, cassandraToObj(comparator, col.name().toByteBuffer())); + setTupleValue(pair, 0, cassandraToObj(comparator, colName)); // value Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);