http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java index dff9ef2..22d40d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.mapreduce; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import java.util.List; @@ -43,6 +44,6 @@ public interface ImportPreUpsertKeyValueProcessor { * @param keyValues list of KeyValues that are to be written to an HFile * @return the list that will actually be written */ - List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues); + List<Cell> preUpsert(byte[] rowKey, List<Cell> keyValues); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java index c888b7d..bb38923 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java @@ -72,6 +72,7 @@ import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRef; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +145,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce @Override public void write(TableRowkeyPair row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = KeyValueUtil.maybeCopyCell(cell); // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 9e0d629..6f469e6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -37,6 +38,7 @@ import org.apache.phoenix.mapreduce.PhoenixJobCounters; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,18 +102,18 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD indxWritable.write(this.pStatement); this.pStatement.execute(); - final Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true); + final Iterator<Pair<byte[], List<Cell>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true); while (uncommittedDataIterator.hasNext()) { - Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); + Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next(); if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) { // skip edits for other tables continue; } - List<KeyValue> keyValueList = kvPair.getSecond(); + List<Cell> keyValueList = kvPair.getSecond(); keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); - for (KeyValue kv : keyValueList) { + for (Cell kv : keyValueList) { outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - context.write(outputKey, kv); + context.write(outputKey, PhoenixKeyValueUtil.maybeCopyCell(kv)); } context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 5b85da5..60f07b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -125,7 +125,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(del); } - del.addDeleteMarker(cell); + del.add(cell); } } // Write Mutation Batch http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index fd84c7c..0b48376 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -1034,7 +1034,6 @@ public class PTableImpl implements PTable { } } - @SuppressWarnings("deprecation") @Override public void delete() { newMutations(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java index 9598ace..2d9f339 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; @@ -53,7 +54,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.SequenceUtil; import com.google.common.collect.Lists; @@ -66,14 +67,14 @@ public class Sequence { public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE}; // create empty Sequence key values used while created a sequence row - private static final KeyValue CURRENT_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CURRENT_VALUE_BYTES); - private static final KeyValue INCREMENT_BY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, INCREMENT_BY_BYTES); - private static final KeyValue CACHE_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CACHE_SIZE_BYTES); - private static final KeyValue MIN_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, MIN_VALUE_BYTES); - private static final KeyValue MAX_VALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, MAX_VALUE_BYTES); - private static final KeyValue CYCLE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CYCLE_FLAG_BYTES); - private static final KeyValue LIMIT_REACHED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, LIMIT_REACHED_FLAG_BYTES); - private static final List<KeyValue> SEQUENCE_KV_COLUMNS = Arrays.<KeyValue>asList( + private static final Cell CURRENT_VALUE_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CURRENT_VALUE_BYTES); + private static final Cell INCREMENT_BY_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, INCREMENT_BY_BYTES); + private static final Cell CACHE_SIZE_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CACHE_SIZE_BYTES); + private static final Cell MIN_VALUE_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, MIN_VALUE_BYTES); + private static final Cell MAX_VALUE_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, MAX_VALUE_BYTES); + private static final Cell CYCLE_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, CYCLE_FLAG_BYTES); + private static final Cell LIMIT_REACHED_KV = org.apache.hadoop.hbase.KeyValueUtil.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, SYSTEM_SEQUENCE_FAMILY_BYTES, LIMIT_REACHED_FLAG_BYTES); + private static final List<Cell> SEQUENCE_KV_COLUMNS = Arrays.<Cell>asList( CURRENT_VALUE_KV, INCREMENT_BY_KV, CACHE_SIZE_KV, @@ -84,7 +85,7 @@ public class Sequence { LIMIT_REACHED_KV ); static { - Collections.sort(SEQUENCE_KV_COLUMNS, KeyValue.COMPARATOR); + Collections.sort(SEQUENCE_KV_COLUMNS, CellComparatorImpl.COMPARATOR); } // Pre-compute index of sequence key values to prevent binary search private static final int CURRENT_VALUE_INDEX = SEQUENCE_KV_COLUMNS.indexOf(CURRENT_VALUE_KV); @@ -301,8 +302,8 @@ public class Sequence { append.setAttribute(SequenceRegionObserver.CURRENT_VALUE_ATTRIB, PLong.INSTANCE.toBytes(value.nextValue)); Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap(); familyMap.put(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, Arrays.<Cell>asList( - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, PLong.INSTANCE.toBytes(value.currentValue)), - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, PBoolean.INSTANCE.toBytes(value.limitReached)) + (Cell)PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, value.timestamp, PLong.INSTANCE.toBytes(value.currentValue)), + (Cell)PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, value.timestamp, PBoolean.INSTANCE.toBytes(value.limitReached)) )); return append; } @@ -363,7 +364,7 @@ public class Sequence { } catch (IOException e) { throw new RuntimeException(e); // Impossible } - for (KeyValue kv : SEQUENCE_KV_COLUMNS) { + for (Cell kv : SEQUENCE_KV_COLUMNS) { try { // Store the timestamp on the cell as well as HBase 1.2 seems to not // be serializing over the time range (see HBASE-15698). @@ -388,64 +389,63 @@ public class Sequence { * @param cellIndex index of the KeyValue to be returned (if the sequence row is from a previous version * @return KeyValue */ - private static KeyValue getKeyValue(Result r, KeyValue kv, int cellIndex) { + private static Cell getKeyValue(Result r, Cell kv, int cellIndex) { Cell[] cells = r.rawCells(); // if the sequence row is from a previous version then MIN_VALUE, MAX_VALUE, CYCLE and LIMIT_REACHED key values are not present, // the sequence row has only three columns (INCREMENT_BY, CACHE_SIZE and CURRENT_VALUE) and the order of the cells // in the array returned by rawCells() is not what what we expect so use getColumnLatestCell() to get the cell we want - Cell cell = cells.length == NUM_SEQUENCE_KEY_VALUES ? cells[cellIndex] : + return cells.length == NUM_SEQUENCE_KEY_VALUES ? cells[cellIndex] : r.getColumnLatestCell(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); - return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell); } - private static KeyValue getKeyValue(Result r, KeyValue kv) { + private static Cell getKeyValue(Result r, Cell kv) { return getKeyValue(r, kv, SEQUENCE_KV_COLUMNS.indexOf(kv)); } - public static KeyValue getCurrentValueKV(Result r) { + public static Cell getCurrentValueKV(Result r) { return getKeyValue(r, CURRENT_VALUE_KV, CURRENT_VALUE_INDEX); } - public static KeyValue getIncrementByKV(Result r) { + public static Cell getIncrementByKV(Result r) { return getKeyValue(r, INCREMENT_BY_KV, INCREMENT_BY_INDEX); } - public static KeyValue getCacheSizeKV(Result r) { + public static Cell getCacheSizeKV(Result r) { return getKeyValue(r, CACHE_SIZE_KV, CACHE_SIZE_INDEX); } - public static KeyValue getMinValueKV(Result r) { + public static Cell getMinValueKV(Result r) { return getKeyValue(r, MIN_VALUE_KV, MIN_VALUE_INDEX); } - public static KeyValue getMaxValueKV(Result r) { + public static Cell getMaxValueKV(Result r) { return getKeyValue(r, MAX_VALUE_KV, MAX_VALUE_INDEX); } - public static KeyValue getCycleKV(Result r) { + public static Cell getCycleKV(Result r) { return getKeyValue(r, CYCLE_KV, CYCLE_INDEX); } - public static KeyValue getLimitReachedKV(Result r) { + public static Cell getLimitReachedKV(Result r) { return getKeyValue(r, LIMIT_REACHED_KV, LIMIT_REACHED_INDEX); } - public static void replaceCurrentValueKV(List<Cell> kvs, KeyValue currentValueKV) { + public static void replaceCurrentValueKV(List<Cell> kvs, Cell currentValueKV) { kvs.set(CURRENT_VALUE_INDEX, currentValueKV); } - public static void replaceMinValueKV(List<Cell> kvs, KeyValue minValueKV) { + public static void replaceMinValueKV(List<Cell> kvs, Cell minValueKV) { kvs.set(MIN_VALUE_INDEX, minValueKV); } - public static void replaceMaxValueKV(List<Cell> kvs, KeyValue maxValueKV) { + public static void replaceMaxValueKV(List<Cell> kvs, Cell maxValueKV) { kvs.set(MAX_VALUE_INDEX, maxValueKV); } - public static void replaceCycleValueKV(List<Cell> kvs, KeyValue cycleValueKV) { + public static void replaceCycleValueKV(List<Cell> kvs, Cell cycleValueKV) { kvs.set(CYCLE_INDEX, cycleValueKV); } - public static void replaceLimitReachedKV(List<Cell> kvs, KeyValue limitReachedKV) { + public static void replaceLimitReachedKV(List<Cell> kvs, Cell limitReachedKV) { kvs.set(LIMIT_REACHED_INDEX, limitReachedKV); } @@ -459,7 +459,7 @@ public class Sequence { return Lists.newArrayList(r.rawCells()); // else we need to handle missing MIN_VALUE, MAX_VALUE, CYCLE and LIMIT_REACHED KeyValues List<Cell> cellList = Lists.newArrayListWithCapacity(NUM_SEQUENCE_KEY_VALUES); - for (KeyValue kv : SEQUENCE_KV_COLUMNS) { + for (Cell kv : SEQUENCE_KV_COLUMNS) { cellList.add(getKeyValue(r,kv)); } return cellList; @@ -502,12 +502,12 @@ public class Sequence { } public SequenceValue(Result r, ValueOp op, long numToAllocate) { - KeyValue currentValueKV = getCurrentValueKV(r); - KeyValue incrementByKV = getIncrementByKV(r); - KeyValue cacheSizeKV = getCacheSizeKV(r); - KeyValue minValueKV = getMinValueKV(r); - KeyValue maxValueKV = getMaxValueKV(r); - KeyValue cycleKV = getCycleKV(r); + Cell currentValueKV = getCurrentValueKV(r); + Cell incrementByKV = getIncrementByKV(r); + Cell cacheSizeKV = getCacheSizeKV(r); + Cell minValueKV = getMinValueKV(r); + Cell maxValueKV = getMaxValueKV(r); + Cell cycleKV = getCycleKV(r); this.timestamp = currentValueKV.getTimestamp(); this.nextValue = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); this.incrementBy = PLong.INSTANCE.getCodec().decodeLong(incrementByKV.getValueArray(), incrementByKV.getValueOffset(), SortOrder.getDefault()); @@ -563,15 +563,15 @@ public class Sequence { Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap(); byte[] startWithBuf = PLong.INSTANCE.toBytes(startWith); familyMap.put(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, Arrays.<Cell>asList( - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, PLong.INSTANCE.toBytes(incrementBy)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, PLong.INSTANCE.toBytes(cacheSize)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, PLong.INSTANCE.toBytes(minValue)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, PLong.INSTANCE.toBytes(maxValue)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PBoolean.INSTANCE.toBytes(cycle)), - KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, PDataType.FALSE_BYTES) + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CURRENT_VALUE_BYTES, timestamp, startWithBuf), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.START_WITH_BYTES, timestamp, startWithBuf), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.INCREMENT_BY_BYTES, timestamp, PLong.INSTANCE.toBytes(incrementBy)), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CACHE_SIZE_BYTES, timestamp, PLong.INSTANCE.toBytes(cacheSize)), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_VALUE_BYTES, timestamp, PLong.INSTANCE.toBytes(minValue)), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_VALUE_BYTES, timestamp, PLong.INSTANCE.toBytes(maxValue)), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.CYCLE_FLAG_BYTES, timestamp, PBoolean.INSTANCE.toBytes(cycle)), + PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG_BYTES, timestamp, PDataType.FALSE_BYTES) )); return append; } @@ -601,7 +601,7 @@ public class Sequence { } Map<byte[], List<Cell>> familyMap = append.getFamilyCellMap(); familyMap.put(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, Arrays.<Cell>asList( - (Cell)KeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY))); + (Cell)PhoenixKeyValueUtil.newKeyValue(key, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timestamp, ByteUtil.EMPTY_BYTE_ARRAY))); return append; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java index 4e4978c..c14759f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Get; @@ -311,12 +312,11 @@ class DefaultStatisticsCollector implements StatisticsCollector { incrementRow = true; } for (Cell cell : results) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); + maxTimeStamp = Math.max(maxTimeStamp, cell.getTimestamp()); Pair<Long, GuidePostsInfoBuilder> gps; if (cachedGuidePosts == null) { - ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(), - kv.getFamilyLength()); + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength()); gps = guidePostsInfoWriterMap.get(cfKey); if (gps == null) { gps = new Pair<Long, GuidePostsInfoBuilder>(0l, @@ -334,7 +334,7 @@ class DefaultStatisticsCollector implements StatisticsCollector { incrementRow = false; } } - int kvLength = kv.getLength(); + int kvLength = CellUtil.estimatedSerializedSizeOf(cell); long byteCount = gps.getFirst() + kvLength; gps.setFirst(byteCount); if (byteCount >= guidePostDepth) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java index bde049b..bfa63ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/MultiKeyValueTuple.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; -import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; /** * Tuple that can be used to represent a list of cells. It is imperative that the list of cells @@ -59,7 +59,7 @@ public class MultiKeyValueTuple extends BaseTuple { @Override public Cell getValue(byte[] family, byte[] qualifier) { - return KeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, values, family, qualifier); + return PhoenixKeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, values, family, qualifier); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java index 63ba101..276c72d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedResultTuple.java @@ -48,8 +48,8 @@ public class PositionBasedResultTuple extends BaseTuple { } @Override - public KeyValue getValue(byte[] family, byte[] qualifier) { - return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cells.getCellForColumnQualifier(qualifier)); + public Cell getValue(byte[] family, byte[] qualifier) { + return cells.getCellForColumnQualifier(qualifier); } @Override @@ -81,14 +81,14 @@ public class PositionBasedResultTuple extends BaseTuple { } @Override - public KeyValue getValue(int index) { - return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(index == 0 ? cells.getFirstCell() : cells.get(index)); + public Cell getValue(int index) { + return index == 0 ? cells.getFirstCell() : cells.get(index); } @Override public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr) { - KeyValue kv = getValue(family, qualifier); + Cell kv = getValue(family, qualifier); if (kv == null) return false; ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java index 3774837..3419e3c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; -import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; /** * @@ -55,10 +55,9 @@ public class ResultTuple extends BaseTuple { } @Override - public KeyValue getValue(byte[] family, byte[] qualifier) { - Cell cell = KeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, + public Cell getValue(byte[] family, byte[] qualifier) { + return PhoenixKeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, result.rawCells(), family, qualifier); - return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell); } @Override @@ -91,14 +90,13 @@ public class ResultTuple extends BaseTuple { @Override public KeyValue getValue(int index) { - return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue( - result.rawCells()[index]); + return PhoenixKeyValueUtil.maybeCopyCell(result.rawCells()[index]); } @Override public boolean getValue(byte[] family, byte[] qualifier, ImmutableBytesWritable ptr) { - KeyValue kv = getValue(family, qualifier); + Cell kv = getValue(family, qualifier); if (kv == null) return false; ptr.set(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index cacf4c4..c26d2cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -285,7 +285,7 @@ public class IndexUtil { regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); } - Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<KeyValue>emptyList(), ts, regionStartKey, regionEndkey); + Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<Cell>emptyList(), ts, regionStartKey, regionEndkey); delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY)); indexMutations.add(delete); } @@ -558,8 +558,8 @@ public class IndexUtil { byte[] value = tupleProjector.getSchema().toBytes(joinTuple, tupleProjector.getExpressions(), tupleProjector.getValueBitSet(), ptr); - KeyValue keyValue = - KeyValueUtil.newKeyValue(firstCell.getRowArray(),firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY, + Cell keyValue = + PhoenixKeyValueUtil.newKeyValue(firstCell.getRowArray(),firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length); result.add(keyValue); } @@ -658,31 +658,6 @@ public class IndexUtil { public int getTagsLength() { return cell.getTagsLength(); } - - @Override - public long getMvccVersion() { - return cell.getMvccVersion(); - } - - @Override - public byte[] getValue() { - return cell.getValue(); - } - - @Override - public byte[] getFamily() { - return cell.getFamily(); - } - - @Override - public byte[] getQualifier() { - return cell.getQualifier(); - } - - @Override - public byte[] getRow() { - return cell.getRow(); - } }; itr.set(newCell); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java deleted file mode 100644 index 4234df5..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.util; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.execute.MutationState.RowMutationState; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; - -/** - * - * Utilities for KeyValue. Where there's duplication with KeyValue methods, - * these avoid creating new objects when not necessary (primary preventing - * byte array copying). - * - * - * @since 0.1 - */ -public class KeyValueUtil { - private KeyValueUtil() { - } - - public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { - return new KeyValue(key, 0, key.length, - cf, 0, cf.length, - cq, 0, cq.length, - ts, Type.Put, - value, valueOffset, valueLength); - } - - public static KeyValue newKeyValue(ImmutableBytesWritable key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { - return new KeyValue(key.get(), key.getOffset(), key.getLength(), - cf, 0, cf.length, - cq, 0, cq.length, - ts, Type.Put, - value, valueOffset, valueLength); - } - - public static KeyValue newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { - return new KeyValue(key, keyOffset, keyLength, - cf, 0, cf.length, - cq, 0, cq.length, - ts, Type.Put, - value, valueOffset, valueLength); - } - - public static KeyValue newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, - int cfOffset, int cfLength, byte[] cq, int cqOffset, int cqLength, long ts, byte[] value, - int valueOffset, int valueLength) { - return new KeyValue(key, keyOffset, keyLength, - cf, cfOffset, cfLength, - cq, cqOffset, cqLength, - ts, Type.Put, - value, valueOffset, valueLength); - } - - public static KeyValue newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) { - return newKeyValue(key, cf, cq, ts, value, 0, value.length); - } - - /** - * Binary search for latest column value without allocating memory in the process - * @param kvBuilder TODO - * @param kvs - * @param family - * @param qualifier - */ - public static Cell getColumnLatest(KeyValueBuilder kvBuilder, List<Cell>kvs, byte[] family, byte[] qualifier) { - if (kvs.size() == 0) { - return null; - } - assert CellUtil.matchingRow(kvs.get(0), kvs.get(kvs.size()-1)); - - Comparator<Cell> comp = new SearchComparator(kvBuilder, family, qualifier); - int pos = Collections.binarySearch(kvs, null, comp); - if (pos < 0 || pos == kvs.size()) { - return null; // doesn't exist - } - - return kvs.get(pos); - } - - - /** - * Binary search for latest column value without allocating memory in the process - * @param kvBuilder TODO - * @param kvs - * @param family - * @param qualifier - */ - public static Cell getColumnLatest(KeyValueBuilder kvBuilder, Cell[] kvs, byte[] family, byte[] qualifier) { - if (kvs.length == 0) { - return null; - } - assert CellUtil.matchingRow(kvs[0], kvs[kvs.length-1]); - - Comparator<Cell> comp = new SearchComparator(kvBuilder, family, qualifier); - int pos = Arrays.binarySearch(kvs, null, comp); - if (pos < 0 || pos == kvs.length) { - return null; // doesn't exist - } - - return kvs[pos]; - } - - /* - * Special comparator, *only* works for binary search. - * - * We make the following assumption: - * 1. All KVs compared have the same row key - * 2. For each (rowkey, family, qualifier) there is at most one version - * 3. Current JDKs only uses the search term on the right side - * - * #1 allows us to avoid row key comparisons altogether. - * #2 allows for exact matches - * #3 lets us save instanceof checks, and allows to inline the search term in the comparator - */ - private static class SearchComparator implements Comparator<Cell> { - private final KeyValueBuilder kvBuilder; - private final byte[] family; - private final byte[] qualifier; - - public SearchComparator(KeyValueBuilder kvBuilder, byte[] f, byte[] q) { - this.kvBuilder = kvBuilder; - family = f; - qualifier = q; - } - - @Override - public int compare(final Cell l, final Cell ignored) { - assert ignored == null; - // family - int val = kvBuilder.compareFamily(l, family, 0, family.length); - if (val != 0) { - return val; - } - // qualifier - return kvBuilder.compareQualifier(l, qualifier, 0, qualifier.length); - } - } - - /** - * Calculate the size a mutation will likely take when stored in HBase - * @param m The Mutation - * @return the disk size of the passed mutation - */ - public static long calculateMutationDiskSize(Mutation m) { - long size = 0; - for (Entry<byte [], List<Cell>> entry : m.getFamilyCellMap().entrySet()) { - for (Cell c : entry.getValue()) { - size += org.apache.hadoop.hbase.KeyValueUtil.length(c); - } - } - return size; - } - - /** - * Estimates the storage size of a row - * @param mutations map from table to row to RowMutationState - * @return estimated row size - */ - public static long - getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { - long size = 0; - // iterate over tables - for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations - .entrySet()) { - PTable table = tableEntry.getKey().getTable(); - // iterate over rows - for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue() - .entrySet()) { - int rowLength = rowEntry.getKey().getLength(); - Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); - switch (table.getImmutableStorageScheme()) { - case ONE_CELL_PER_COLUMN: - // iterate over columns - for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { - PColumn pColumn = colValueEntry.getKey(); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - pColumn.getFamilyName().getBytes().length, - pColumn.getColumnQualifierBytes().length, - colValueEntry.getValue().length); - } - break; - case SINGLE_CELL_ARRAY_WITH_OFFSETS: - // we store all the column values in a single key value that contains all the - // column values followed by an offset array - size += - PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, - colValueMap); - break; - } - // count the empty key value - Pair<byte[], byte[]> emptyKeyValueInfo = - EncodedColumnsUtil.getEmptyKeyValueInfo(table); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), - emptyKeyValueInfo.getFirst().length, - emptyKeyValueInfo.getSecond().length); - } - } - return size; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index 69eb5bc..7914e3e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -254,7 +254,7 @@ public class MetaDataUtil { List<Cell> kvs = headerRow.getFamilyCellMap().get(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES); if (kvs != null) { for (Cell cell : kvs) { - KeyValue kv = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(cell); if (builder.compareQualifier(kv, key, 0, key.length) ==0) { builder.getValueAsPtr(kv, ptr); return true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java new file mode 100644 index 0000000..d532130 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder.DataType; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; + +/** + * + * Utilities for KeyValue. Where there's duplication with KeyValue methods, + * these avoid creating new objects when not necessary (primary preventing + * byte array copying). + * + * + * @since 0.1 + */ +public class PhoenixKeyValueUtil { + private PhoenixKeyValueUtil() { + } + + public static Cell newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(key).setFamily(cf) + .setQualifier(cq).setTimestamp(ts).setType(DataType.Put) + .setValue(value, valueOffset, valueLength).build(); + } + + public static Cell newKeyValue(ImmutableBytesWritable key, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(key.get(), key.getOffset(), key.getLength()).setFamily(cf).setQualifier(cq) + .setTimestamp(ts).setType(DataType.Put).setValue(value, valueOffset, valueLength) + .build(); + } + + public static Cell newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, byte[] cq, long ts, byte[] value, int valueOffset, int valueLength) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(key, keyOffset, keyLength).setFamily(cf).setQualifier(cq).setTimestamp(ts) + .setType(DataType.Put).setValue(value, valueOffset, valueLength).build(); + } + + public static Cell newKeyValue(byte[] key, int keyOffset, int keyLength, byte[] cf, + int cfOffset, int cfLength, byte[] cq, int cqOffset, int cqLength, long ts, byte[] value, + int valueOffset, int valueLength) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(key, keyOffset, keyLength).setFamily(cf, cfOffset, cfLength) + .setQualifier(cq, cqOffset, cqLength).setTimestamp(ts) + .setValue(value, valueOffset, valueLength).build(); + } + + public static Cell newKeyValue(byte[] key, byte[] cf, byte[] cq, long ts, byte[] value) { + return newKeyValue(key, cf, cq, ts, value, 0, value.length); + } + + /** + * Binary search for latest column value without allocating memory in the process + * @param kvBuilder TODO + * @param kvs + * @param family + * @param qualifier + */ + public static Cell getColumnLatest(KeyValueBuilder kvBuilder, List<Cell>kvs, byte[] family, byte[] qualifier) { + if (kvs.size() == 0) { + return null; + } + assert CellUtil.matchingRows(kvs.get(0), kvs.get(kvs.size()-1)); + + Comparator<Cell> comp = new SearchComparator(kvBuilder, family, qualifier); + int pos = Collections.binarySearch(kvs, null, comp); + if (pos < 0 || pos == kvs.size()) { + return null; // doesn't exist + } + + return kvs.get(pos); + } + + + /** + * Binary search for latest column value without allocating memory in the process + * @param kvBuilder TODO + * @param kvs + * @param family + * @param qualifier + */ + public static Cell getColumnLatest(KeyValueBuilder kvBuilder, Cell[] kvs, byte[] family, byte[] qualifier) { + if (kvs.length == 0) { + return null; + } + assert CellUtil.matchingRows(kvs[0], kvs[kvs.length-1]); + + Comparator<Cell> comp = new SearchComparator(kvBuilder, family, qualifier); + int pos = Arrays.binarySearch(kvs, null, comp); + if (pos < 0 || pos == kvs.length) { + return null; // doesn't exist + } + + return kvs[pos]; + } + + /* + * Special comparator, *only* works for binary search. + * + * We make the following assumption: + * 1. All KVs compared have the same row key + * 2. For each (rowkey, family, qualifier) there is at most one version + * 3. Current JDKs only uses the search term on the right side + * + * #1 allows us to avoid row key comparisons altogether. + * #2 allows for exact matches + * #3 lets us save instanceof checks, and allows to inline the search term in the comparator + */ + private static class SearchComparator implements Comparator<Cell> { + private final KeyValueBuilder kvBuilder; + private final byte[] family; + private final byte[] qualifier; + + public SearchComparator(KeyValueBuilder kvBuilder, byte[] f, byte[] q) { + this.kvBuilder = kvBuilder; + family = f; + qualifier = q; + } + + @Override + public int compare(final Cell l, final Cell ignored) { + assert ignored == null; + // family + int val = kvBuilder.compareFamily(l, family, 0, family.length); + if (val != 0) { + return val; + } + // qualifier + return kvBuilder.compareQualifier(l, qualifier, 0, qualifier.length); + } + } + + /** + * Calculate the size a mutation will likely take when stored in HBase + * @param m The Mutation + * @return the disk size of the passed mutation + */ + public static long calculateMutationDiskSize(Mutation m) { + long size = 0; + for (Entry<byte [], List<Cell>> entry : m.getFamilyCellMap().entrySet()) { + for (Cell c : entry.getValue()) { + size += org.apache.hadoop.hbase.KeyValueUtil.length(c); + } + } + return size; + } + + /** + * Estimates the storage size of a row + * @param mutations map from table to row to RowMutationState + * @return estimated row size + */ + public static long + getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { + long size = 0; + // iterate over tables + for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations + .entrySet()) { + PTable table = tableEntry.getKey().getTable(); + // iterate over rows + for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue() + .entrySet()) { + int rowLength = rowEntry.getKey().getLength(); + Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); + switch (table.getImmutableStorageScheme()) { + case ONE_CELL_PER_COLUMN: + // iterate over columns + for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) { + PColumn pColumn = colValueEntry.getKey(); + size += + KeyValue.getKeyValueDataStructureSize(rowLength, + pColumn.getFamilyName().getBytes().length, + pColumn.getColumnQualifierBytes().length, + colValueEntry.getValue().length); + } + break; + case SINGLE_CELL_ARRAY_WITH_OFFSETS: + // we store all the column values in a single key value that contains all the + // column values followed by an offset array + size += + PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, + colValueMap); + break; + } + // count the empty key value + Pair<byte[], byte[]> emptyKeyValueInfo = + EncodedColumnsUtil.getEmptyKeyValueInfo(table); + size += + KeyValue.getKeyValueDataStructureSize(rowLength, + SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), + emptyKeyValueInfo.getFirst().length, + emptyKeyValueInfo.getSecond().length); + } + } + return size; + } + + public static KeyValue maybeCopyCell(Cell c) { + // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something + // that will likely be removed at some point in time. + if (c == null) return null; + if (c instanceof KeyValue) { + return (KeyValue) c; + } + return KeyValueUtil.copyToNewKeyValue(c); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 16ef206..6b5a73a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -53,7 +53,6 @@ import org.apache.commons.cli.PosixParser; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; @@ -358,8 +357,8 @@ public class PhoenixRuntime { * @throws SQLException */ @Deprecated - public static List<KeyValue> getUncommittedData(Connection conn) throws SQLException { - Iterator<Pair<byte[],List<KeyValue>>> iterator = getUncommittedDataIterator(conn); + public static List<Cell> getUncommittedData(Connection conn) throws SQLException { + Iterator<Pair<byte[],List<Cell>>> iterator = getUncommittedDataIterator(conn); if (iterator.hasNext()) { return iterator.next().getSecond(); } @@ -373,7 +372,7 @@ public class PhoenixRuntime { * @return the list of HBase mutations for uncommitted data * @throws SQLException */ - public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn) throws SQLException { + public static Iterator<Pair<byte[],List<Cell>>> getUncommittedDataIterator(Connection conn) throws SQLException { return getUncommittedDataIterator(conn, false); } @@ -384,10 +383,10 @@ public class PhoenixRuntime { * @return the list of HBase mutations for uncommitted data * @throws SQLException */ - public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException { + public static Iterator<Pair<byte[],List<Cell>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException { final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes); - return new Iterator<Pair<byte[],List<KeyValue>>>() { + return new Iterator<Pair<byte[],List<Cell>>>() { @Override public boolean hasNext() { @@ -395,18 +394,18 @@ public class PhoenixRuntime { } @Override - public Pair<byte[], List<KeyValue>> next() { + public Pair<byte[], List<Cell>> next() { Pair<byte[],List<Mutation>> pair = iterator.next(); - List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row + List<Cell> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row for (Mutation mutation : pair.getSecond()) { for (List<Cell> keyValueList : mutation.getFamilyCellMap().values()) { for (Cell keyValue : keyValueList) { - keyValues.add(org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(keyValue)); + keyValues.add(PhoenixKeyValueUtil.maybeCopyCell(keyValue)); } } } Collections.sort(keyValues, pconn.getKeyValueBuilder().getKeyValueComparator()); - return new Pair<byte[], List<KeyValue>>(pair.getFirst(),keyValues); + return new Pair<byte[], List<Cell>>(pair.getFirst(),keyValues); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java index f97230b..967f38d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java @@ -46,7 +46,7 @@ public class ResultUtil { public static Result toResult(ImmutableBytesWritable bytes) { byte [] buf = bytes.get(); int offset = bytes.getOffset(); - int finalOffset = bytes.getSize() + offset; + int finalOffset = bytes.getLength() + offset; List<Cell> kvs = new ArrayList<Cell>(); while(offset < finalOffset) { int keyLength = Bytes.toInt(buf, offset); @@ -70,9 +70,8 @@ public class ResultUtil { //key.set(getRawBytes(r), getKeyOffset(r), getKeyLength(r)); } - @SuppressWarnings("deprecation") - public static void getKey(KeyValue value, ImmutableBytesWritable key) { - key.set(value.getBuffer(), value.getRowOffset(), value.getRowLength()); + public static void getKey(Cell value, ImmutableBytesWritable key) { + key.set(value.getRowArray(), value.getRowOffset(), value.getRowLength()); } /** @@ -109,7 +108,7 @@ public class ResultUtil { * @param r */ static int getKeyOffset(Result r) { - KeyValue firstKV = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(r.rawCells()[0]); + KeyValue firstKV = PhoenixKeyValueUtil.maybeCopyCell(r.rawCells()[0]); return firstKV.getOffset(); } @@ -118,9 +117,8 @@ public class ResultUtil { return Bytes.toShort(getRawBytes(r), getKeyOffset(r) - Bytes.SIZEOF_SHORT); } - @SuppressWarnings("deprecation") static byte[] getRawBytes(Result r) { - KeyValue firstKV = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(r.rawCells()[0]); + KeyValue firstKV = PhoenixKeyValueUtil.maybeCopyCell(r.rawCells()[0]); return firstKV.getBuffer(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/TupleUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TupleUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TupleUtil.java index f495a7e..ec9bf49 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TupleUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TupleUtil.java @@ -137,18 +137,17 @@ public class TupleUtil { } } - @SuppressWarnings("deprecation") public static int write(Tuple result, DataOutput out) throws IOException { int size = 0; for(int i = 0; i < result.size(); i++) { - KeyValue kv = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.getValue(i)); + KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(result.getValue(i)); size += kv.getLength(); size += Bytes.SIZEOF_INT; // kv.getLength } WritableUtils.writeVInt(out, size); for(int i = 0; i < result.size(); i++) { - KeyValue kv = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.getValue(i)); + KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(result.getValue(i)); out.writeInt(kv.getLength()); out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 33ad7e5..43f11b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -648,7 +648,7 @@ public class UpgradeUtil { Table sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); try { logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM); - KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey, + Cell saltKV = PhoenixKeyValueUtil.newKeyValue(seqTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, @@ -667,7 +667,7 @@ public class UpgradeUtil { // This is needed as a fix for https://issues.apache.org/jira/browse/PHOENIX-1401 if (oldTable.getTimeStamp() == MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0) { byte[] oldSeqNum = PLong.INSTANCE.toBytes(oldTable.getSequenceNumber()); - KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey, + Cell seqNumKV = PhoenixKeyValueUtil.newKeyValue(seqTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, @@ -761,7 +761,7 @@ public class UpgradeUtil { if (!success) { if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything // Don't use Delete here as we'd never be able to change it again at this timestamp. - KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey, + Cell unsaltKV = PhoenixKeyValueUtil.newKeyValue(seqTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, @@ -1675,7 +1675,7 @@ public class UpgradeUtil { if (!columnCells.isEmpty() && (timestamp = columnCells.get(0) .getTimestamp()) < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { - KeyValue upgradeKV = KeyValueUtil.newKeyValue(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + Cell upgradeKV = PhoenixKeyValueUtil.newKeyValue(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, UPGRADE_TO_4_7_COLUMN_NAME, timestamp, PBoolean.INSTANCE.toBytes(true)); Put upgradePut = new Put(statsTableKey); upgradePut.add(upgradeKV); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java index 8bb491d..ddbd4a3 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ReadWriteKeyValuesWithCodecTest.java @@ -32,15 +32,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.phoenix.hbase.index.IndexTestingUtils; import org.apache.phoenix.hbase.index.wal.IndexedKeyValue; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.junit.BeforeClass; import org.junit.Test; @@ -130,7 +131,7 @@ public class ReadWriteKeyValuesWithCodecTest { private void addMutation(WALEdit edit, Mutation m, byte[] family) { List<Cell> kvs = m.getFamilyCellMap().get(FAMILY); for (Cell kv : kvs) { - edit.add(KeyValueUtil.ensureKeyValue(kv)); + edit.add(PhoenixKeyValueUtil.maybeCopyCell(kv)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java index 8553b73..0374044 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java @@ -27,6 +27,7 @@ import java.sql.DriverManager; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; @@ -95,19 +96,19 @@ public class MutationStateTest { conn.createStatement().execute("upsert into MUTATION_TEST2(id2,appId2) values(222,'app2')"); - Iterator<Pair<byte[],List<KeyValue>>> dataTableNameAndMutationKeyValuesIter = + Iterator<Pair<byte[],List<Cell>>> dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn); assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext()); - Pair<byte[],List<KeyValue>> pair=dataTableNameAndMutationKeyValuesIter.next(); + Pair<byte[],List<Cell>> pair=dataTableNameAndMutationKeyValuesIter.next(); String tableName1=Bytes.toString(pair.getFirst()); - List<KeyValue> keyValues1=pair.getSecond(); + List<Cell> keyValues1=pair.getSecond(); assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext()); pair=dataTableNameAndMutationKeyValuesIter.next(); String tableName2=Bytes.toString(pair.getFirst()); - List<KeyValue> keyValues2=pair.getSecond(); + List<Cell> keyValues2=pair.getSecond(); if("MUTATION_TEST1".equals(tableName1)) { assertTable(tableName1, keyValues1, tableName2, keyValues2); @@ -124,7 +125,7 @@ public class MutationStateTest { } } - private void assertTable(String tableName1,List<KeyValue> keyValues1,String tableName2,List<KeyValue> keyValues2) { + private void assertTable(String tableName1,List<Cell> keyValues1,String tableName2,List<Cell> keyValues2) { assertTrue("MUTATION_TEST1".equals(tableName1)); assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(111),CellUtil.cloneRow(keyValues1.get(0)))); assertTrue("app1".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues1.get(1))))); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java index 195c2f0..5383d9b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java @@ -61,7 +61,7 @@ import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PIntegerArray; import org.apache.phoenix.schema.types.PVarcharArray; import org.apache.phoenix.schema.types.PhoenixArray; -import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.junit.Test; import com.google.common.collect.Lists; @@ -159,7 +159,7 @@ public class UnnestArrayPlanTest { for (Object[] array : arrays) { PhoenixArray pArray = new PhoenixArray(baseType, array); byte[] bytes = arrayType.toBytes(pArray); - tuples.add(new SingleKeyValueTuple(KeyValueUtil.newKeyValue(bytes, 0, bytes.length, bytes, 0, 0, bytes, 0, 0, 0, bytes, 0, 0))); + tuples.add(new SingleKeyValueTuple(PhoenixKeyValueUtil.newKeyValue(bytes, 0, bytes.length, bytes, 0, 0, bytes, 0, 0, 0, bytes, 0, 0))); } return tuples; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java index 6c28cdf..603b68e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.query.KeyRange; @@ -608,13 +609,13 @@ public class SkipScanFilterTest extends TestCase { @SuppressWarnings("deprecation") @Override public void examine(SkipScanFilter skipper) throws IOException { - KeyValue kv = KeyValue.createFirstOnRow(rowkey); + KeyValue kv = KeyValueUtil.createFirstOnRow(rowkey); skipper.reset(); assertFalse(skipper.filterAllRemaining()); assertFalse(skipper.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())); assertEquals(ReturnCode.SEEK_NEXT_USING_HINT, skipper.filterKeyValue(kv)); - assertEquals(KeyValue.createFirstOnRow(hint), skipper.getNextCellHint(kv)); + assertEquals(KeyValueUtil.createFirstOnRow(hint), skipper.getNextCellHint(kv)); } @Override public String toString() { @@ -634,7 +635,7 @@ public class SkipScanFilterTest extends TestCase { @SuppressWarnings("deprecation") @Override public void examine(SkipScanFilter skipper) throws IOException { - KeyValue kv = KeyValue.createFirstOnRow(rowkey); + KeyValue kv = KeyValueUtil.createFirstOnRow(rowkey); skipper.reset(); assertFalse(skipper.filterAllRemaining()); assertFalse(skipper.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())); @@ -657,7 +658,7 @@ public class SkipScanFilterTest extends TestCase { } @Override public void examine(SkipScanFilter skipper) throws IOException { - KeyValue kv = KeyValue.createFirstOnRow(rowkey); + KeyValue kv = KeyValueUtil.createFirstOnRow(rowkey); skipper.reset(); assertEquals(ReturnCode.NEXT_ROW,skipper.filterKeyValue(kv)); skipper.reset(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java index 5868103..39e9680 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; @@ -72,11 +73,11 @@ public class IndexTestingUtils { // s.setRaw(true); s.setMaxVersions(); s.setTimeRange(start, end); - List<KeyValue> received = new ArrayList<KeyValue>(); + List<Cell> received = new ArrayList<Cell>(); ResultScanner scanner = index1.getScanner(s); for (Result r : scanner) { - received.addAll(r.list()); - LOG.debug("Received: " + r.list()); + received.addAll(r.listCells()); + LOG.debug("Received: " + r.listCells()); } scanner.close(); assertEquals("Didn't get the expected kvs from the index table!", expected, received); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java index fc4734d..a83aeff 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java @@ -17,6 +17,7 @@ import java.util.Map.Entry; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; @@ -172,8 +173,11 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } // there is a next value - we only care about the current value, so we can just snag that Cell next = kvs.next(); - if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { - byte[] v = next.getValue(); + if (ref.matchesFamily(next.getFamilyArray(), next.getFamilyOffset(), + next.getFamilyLength()) + && ref.matchesQualifier(next.getQualifierArray(), next.getQualifierOffset(), + next.getQualifierLength())) { + byte[] v = CellUtil.cloneValue(next); totalValueLength += v.length; entries.add(new ColumnEntry(v, ref)); } else { @@ -188,20 +192,20 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } // matches all columns, so we need to iterate until we hit the next column with the same // family as the current key - byte[] lastQual = next.getQualifier(); + byte[] lastQual = CellUtil.cloneQualifier(next); byte[] nextQual = null; while ((next = kvs.next()) != null) { // different family, done with this column - if (!ref.matchesFamily(next.getFamily())) { + if (!ref.matchesFamily(next.getFamilyArray(), next.getFamilyOffset(), next.getFamilyLength())) { break; } - nextQual = next.getQualifier(); + nextQual = CellUtil.cloneQualifier(next); // we are still on the same qualifier - skip it, since we already added a column for it if (Arrays.equals(lastQual, nextQual)) { continue; } // this must match the qualifier since its an all-qualifiers specifier, so we add it - byte[] v = next.getValue(); + byte[] v = CellUtil.cloneValue(next); totalValueLength += v.length; entries.add(new ColumnEntry(v, ref)); // update the last qualifier to check against @@ -285,7 +289,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { * expected value--column pair * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs. */ - public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, + public static List<Cell> getIndexKeyValueForTesting(byte[] pk, long timestamp, List<Pair<byte[], CoveredColumn>> values) { int length = 0; @@ -299,8 +303,8 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); Put p = new Put(rowKey, timestamp); CoveredColumnIndexCodec.addColumnsToPut(p, expected); - List<KeyValue> kvs = new ArrayList<KeyValue>(); - for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { + List<Cell> kvs = new ArrayList<Cell>(); + for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap().entrySet()) { kvs.addAll(entry.getValue()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java index b3b143e..cc74dda 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java @@ -40,6 +40,7 @@ import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; +import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -99,7 +100,7 @@ public class LocalTableStateTest { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); //add the kvs from the mutation - table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + table.addPendingUpdates(m.get(fam, qual)); // setup the lookup ColumnReference col = new ColumnReference(fam, qual); @@ -145,7 +146,7 @@ public class LocalTableStateTest { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); //add the kvs from the mutation - table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + table.addPendingUpdates(m.get(fam, qual)); // setup the lookup ColumnReference col = new ColumnReference(fam, qual); @@ -182,7 +183,7 @@ public class LocalTableStateTest { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); //add the kvs from the mutation - table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual))); + table.addPendingUpdates(m.get(fam, qual)); // setup the lookup ColumnReference col = new ColumnReference(fam, qual); @@ -224,7 +225,7 @@ public class LocalTableStateTest { LocalHBaseState state = new LocalTable(env); LocalTableState table = new LocalTableState(env, state, m); // add the kvs from the mutation - KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0)); + KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(m.get(fam, qual).get(0)); kv.setSequenceId(0); table.addPendingUpdates(kv); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java index 7dd46d6..8c0a693 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java @@ -165,7 +165,7 @@ public class TestCoveredColumnIndexCodec { // start with a basic put that has some keyvalues Put p = new Put(PK); // setup the kvs to add - List<KeyValue> kvs = new ArrayList<KeyValue>(); + List<Cell> kvs = new ArrayList<Cell>(); byte[] v1 = Bytes.toBytes("v1"); KeyValue kv = new KeyValue(PK, FAMILY, QUAL, 1, v1); kvs.add(kv); @@ -203,7 +203,7 @@ public class TestCoveredColumnIndexCodec { d.addFamily(FAMILY, 2); // setup the next batch of 'current state', basically just ripping out the current state from // the last round - table = new SimpleTableState(new Result(kvs)); + table = new SimpleTableState(Result.create(kvs)); state = new LocalTableState(env, table, d); state.setCurrentTimestamp(2); // check the cleanup of the current table, after the puts (mocking a 'next' update) @@ -230,13 +230,13 @@ public class TestCoveredColumnIndexCodec { ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d); } - private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<KeyValue> currentState, + private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<Cell> currentState, Delete d) throws IOException { - LocalHBaseState table = new SimpleTableState(new Result(currentState)); + LocalHBaseState table = new SimpleTableState(Result.create(currentState)); LocalTableState state = new LocalTableState(env, table, d); state.setCurrentTimestamp(d.getTimeStamp()); // now we shouldn't see anything when getting the index update - state.addPendingUpdates(d.getFamilyMap().get(FAMILY)); + state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY)); Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); for (IndexUpdate update : updates) { assertFalse("Had some index updates, though it should have been covered by the delete", http://git-wip-us.apache.org/repos/asf/phoenix/blob/c82cc18d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestIndexMemStore.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestIndexMemStore.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestIndexMemStore.java index 400757d..bcd5666 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestIndexMemStore.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/data/TestIndexMemStore.java @@ -20,8 +20,10 @@ package org.apache.phoenix.hbase.index.covered.data; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.scanner.ReseekableScanner; import org.junit.Test; @@ -36,7 +38,7 @@ public class TestIndexMemStore { @Test public void testCorrectOverwritting() throws Exception { - IndexMemStore store = new IndexMemStore(IndexMemStore.COMPARATOR); + IndexMemStore store = new IndexMemStore(CellComparatorImpl.COMPARATOR); long ts = 10; KeyValue kv = new KeyValue(row, family, qual, ts, Type.Put, val); kv.setSequenceId(2); @@ -46,7 +48,7 @@ public class TestIndexMemStore { // adding the exact same kv shouldn't change anything stored if not overwritting store.add(kv2, false); ReseekableScanner scanner = store.getScanner(); - KeyValue first = KeyValue.createFirstOnRow(row); + KeyValue first = KeyValueUtil.createFirstOnRow(row); scanner.seek(first); assertTrue("Overwrote kv when specifically not!", kv == scanner.next()); scanner.close(); @@ -80,7 +82,7 @@ public class TestIndexMemStore { // null qualifiers should always sort before the non-null cases ReseekableScanner scanner = store.getScanner(); - KeyValue first = KeyValue.createFirstOnRow(row); + KeyValue first = KeyValueUtil.createFirstOnRow(row); assertTrue("Didn't have any data in the scanner", scanner.seek(first)); assertTrue("Didn't get delete family first (no qualifier == sort first)", df == scanner.next()); assertTrue("Didn't get point delete before corresponding put", d == scanner.next());
