http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java index 00ece40..15a9f74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -80,6 +81,11 @@ public abstract class CloneExpressionVisitor extends TraverseAllExpressionVisito public Expression visit(KeyValueColumnExpression node) { return node; } + + @Override + public Expression visit(ArrayColumnExpression node) { + return node; + } @Override public Expression visit(ProjectedColumnExpression node) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java index 31f340d..100f099 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java @@ -27,6 +27,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -113,6 +114,7 @@ public interface ExpressionVisitor<E> { public E visit(LiteralExpression node); public E visit(RowKeyColumnExpression node); public E visit(KeyValueColumnExpression node); + public E visit(ArrayColumnExpression node); public E visit(ProjectedColumnExpression node); public E visit(SequenceValueExpression node); http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java index 3b7067a..9e50bc4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -121,6 +121,11 @@ public class StatelessTraverseAllExpressionVisitor<E> extends TraverseAllExpress } @Override + public E visit(ArrayColumnExpression node) { + return null; + } + + @Override public E visit(ProjectedColumnExpression node) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java index 83b28bd..1a2f2cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -114,6 +114,11 @@ public class StatelessTraverseNoExpressionVisitor<E> extends TraverseNoExpressio public E visit(RowKeyColumnExpression node) { return null; } + + @Override + public E visit(ArrayColumnExpression node) { + return null; + } @Override public E visit(KeyValueColumnExpression node) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java index 92e5c20..3d6843d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.EncodedColumnsUtil; /** * When selecting specific columns in a SELECT query, this filter passes only selected columns @@ -54,6 +54,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { private byte[] emptyCFName; private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker; private Set<byte[]> conditionOnlyCfs; + private boolean usesEncodedColumnNames; + private byte[] emptyKVQualifier; public ColumnProjectionFilter() { @@ -61,10 +63,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { public ColumnProjectionFilter(byte[] emptyCFName, Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker, - Set<byte[]> conditionOnlyCfs) { + Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) { this.emptyCFName = emptyCFName; this.columnsTracker = columnsTracker; this.conditionOnlyCfs = conditionOnlyCfs; + this.usesEncodedColumnNames = usesEncodedColumnNames; + this.emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); } @Override @@ -88,6 +92,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { familyMapSize--; } int conditionOnlyCfsSize = WritableUtils.readVInt(input); + usesEncodedColumnNames = conditionOnlyCfsSize > 0; + emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value. this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); while (conditionOnlyCfsSize > 0) { this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input)); @@ -111,12 +118,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { } } } - // Write conditionOnlyCfs - WritableUtils.writeVInt(output, this.conditionOnlyCfs.size()); + // Encode usesEncodedColumnNames in conditionOnlyCfs size. + WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1)); for (byte[] f : this.conditionOnlyCfs) { WritableUtils.writeCompressedByteArray(output, f); } - } + +} @Override public byte[] toByteArray() throws IOException { @@ -156,9 +164,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { // make sure we're not holding to any of the byte[]'s ptr.set(HConstants.EMPTY_BYTE_ARRAY); if (kvs.isEmpty()) { - kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName, - 0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); + kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(), + this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0, + emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index dba700b..a7146fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -26,6 +26,7 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.ExpressionVisitor; @@ -94,7 +95,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil refCount = foundColumns.size(); } - public ReturnCode resolveColumn(Cell value) { + private ReturnCode resolveColumn(Cell value) { // Always set key, in case we never find a key value column of interest, // and our expression uses row key columns. setKey(value); @@ -184,7 +185,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName()); + inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java index 0d904bc..195c89c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java @@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFi public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { try { - return (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + SingleCQKeyValueComparisonFilter writable = (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + return writable; } catch (IOException e) { throw new DeserializationException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java index eaf8d35..b97c4e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java @@ -22,11 +22,13 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; @@ -58,7 +60,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi @Override public Void visit(KeyValueColumnExpression expression) { cf = expression.getColumnFamily(); - cq = expression.getColumnName(); + cq = expression.getColumnQualifier(); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index bcadc2b..19797cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -35,4 +35,5 @@ public interface ValueGetter { public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; public byte[] getRowKey(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 6f9caa6..0f960e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; import com.google.common.collect.Lists; -import com.google.common.collect.Lists; /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java index 741bf87..56b60e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/KeyValueBuilder.java @@ -125,4 +125,5 @@ public abstract class KeyValueBuilder { public abstract KVComparator getKeyValueComparator(); public abstract List<Mutation> cloneIfNecessary(List<Mutation> mutations); + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 237ed75..7c88a25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -17,12 +17,15 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier; + import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +35,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -44,16 +48,20 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -67,14 +75,16 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.AmbiguousColumnException; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; @@ -82,10 +92,13 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -93,6 +106,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.tephra.TxConstants; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -276,8 +290,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; private Set<ColumnReference> coveredColumns; - // Map used to cache column family of data table and the corresponding column family for the local index - private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap; + // Information for columns of data tables that are being indexed. The first part of the pair is column family and second part is the column name. + private Set<Pair<String, String>> indexedColumnsInfo; + // Information for columns of data tables that are being covered by the index. The first part of the pair is column family and second part is the column name. + private Set<Pair<String, String>> coveredColumnsInfo; + // Map of covered columns where a key is column reference for a column in the data table + // and value is column reference for corresponding column in the index table. + // TODO: samarth confirm that we don't need a separate map for tracking column families of local indexes. + private Map<ColumnReference, ColumnReference> coveredColumnsMap; // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -291,39 +311,40 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; + private boolean storeColsInSingleCell; // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - private List<ImmutableBytesPtr> indexQualifiers; private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; + private boolean usesEncodedColumnNames; + private ImmutableBytesPtr emptyKeyValueQualifierPtr; private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; - + this.usesEncodedColumnNames = EncodedColumnsUtil.usesEncodedColumnNames(index); byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum(); boolean indexWALDisabled = index.isWALDisabled(); int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1); -// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0; int nIndexColumns = index.getColumns().size() - indexPosOffset; int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset; // number of expressions that are indexed that are not present in the row key of the data table @@ -334,7 +355,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { - PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName); + PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { @@ -367,7 +388,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); - this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -376,6 +397,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // TODO: check whether index is immutable or not. Currently it's always false so checking // data table is with immutable rows or not. this.immutableRows = dataTable.isImmutableRows(); + this.storeColsInSingleCell = index.getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; int indexColByteSize = 0; ColumnResolver resolver = null; List<ParseNode> parseNodes = new ArrayList<ParseNode>(1); @@ -397,6 +419,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); + this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + this.coveredColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); + IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { PColumn indexColumn = index.getPKColumns().get(i); @@ -409,12 +434,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data table that corresponds to this index column + // get the column of the data column that corresponds to this index column PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); boolean isPKColumn = SchemaUtil.isPKColumn(column); if (isPKColumn) { int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0); this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos); + indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString())); } else { indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; try { @@ -424,6 +450,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression = CoerceExpression.create(expression, indexColumn.getDataType()); } this.indexedExpressions.add(expression); + indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString())); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } @@ -432,6 +459,45 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { else { indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE; this.indexedExpressions.add(expression); + KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() { + @Override + public Void visit(KeyValueColumnExpression colExpression) { + return addDataColInfo(dataTable, colExpression); + } + + @Override + public Void visit(ArrayColumnExpression expression) { + return addDataColInfo(dataTable, expression); + } + + private Void addDataColInfo(final PTable dataTable, Expression expression) { + Preconditions.checkArgument(expression instanceof ArrayColumnExpression + || expression instanceof KeyValueColumnExpression); + + KeyValueColumnExpression colExpression = null; + if (expression instanceof ArrayColumnExpression) { + colExpression = + ((ArrayColumnExpression) expression).getKeyValueExpression(); + } else { + colExpression = ((KeyValueColumnExpression) expression); + } + byte[] cf = colExpression.getColumnFamily(); + byte[] cq = colExpression.getColumnQualifier(); + try { + PColumn dataColumn = + cf == null ? dataTable.getPColumnForColumnQualifier(null, cq) + : dataTable.getColumnFamily(cf) + .getPColumnForColumnQualifier(cq); + indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() + .getString(), dataColumn.getName().getString())); + } catch (ColumnNotFoundException | ColumnFamilyNotFoundException + | AmbiguousColumnException e) { + throw new RuntimeException(e); + } + return null; + } + }; + expression.accept(kvVisitor); } // set the sort order of the expression correctly if (indexColumn.getSortOrder() == SortOrder.DESC) { @@ -442,18 +508,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (int i = 0; i < index.getColumnFamilies().size(); i++) { PColumnFamily family = index.getColumnFamilies().get(i); for (PColumn indexColumn : family.getColumns()) { - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - PName dataTableFamily = column.getFamilyName(); - this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes())); - if(isLocalIndex) { - this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString())))); - } + PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + byte[] dataColumnCq = EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable); + byte[] indexColumnCq = EncodedColumnsUtil.getColumnQualifier(indexColumn, index); + this.coveredColumns.add(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq)); + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); + this.coveredColumnsInfo.add(new Pair<>(dataColumn.getFamilyName().getString(), dataColumn.getName().getString())); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); initCachedState(); } - + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; @@ -856,35 +923,113 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - Put put = null; + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); + Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, + this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts, // set the value to the empty column name - QueryConstants.EMPTY_COLUMN_BYTES_PTR)); + emptyKeyValueQualifierPtr)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - int i = 0; - for (ColumnReference ref : this.getCoveredColumns()) { - ImmutableBytesPtr cq = this.indexQualifiers.get(i++); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (value != null) { + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); + if (storeColsInSingleCell) { + // map from index column family to list of pair of index column and data column (for covered columns) + Map<ByteBuffer, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap(); + for (ColumnReference ref : this.getCoveredColumns()) { + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ByteBuffer cf = ByteBuffer.wrap(indexColRef.getFamily()); + if (!familyToColListMap.containsKey(cf)) { + familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList()); + } + familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref)); + } + // iterate over each column family and create a byte[] containing all the columns + for (Entry<ByteBuffer, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) { + byte[] columnFamily = entry.getKey().array(); + List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue(); + int maxIndex = Integer.MIN_VALUE; + // find the max col qualifier + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + int qualifier = getEncodedColumnQualifier(colRefPair.getFirst().getQualifier()); + maxIndex = Math.max(maxIndex, qualifier); + } + byte[][] colValues = new byte[maxIndex+1][]; + // set the values of the columns + for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { + ColumnReference indexColRef = colRefPair.getFirst(); + ColumnReference dataColRef = colRefPair.getSecond(); + int dataArrayPos = getEncodedColumnQualifier(dataColRef.getQualifier()); + Expression expression = new ArrayColumnExpression(new PDatum() { + @Override + public boolean isNullable() { + return false; + } + + @Override + public SortOrder getSortOrder() { + return null; + } + + @Override + public Integer getScale() { + return null; + } + + @Override + public Integer getMaxLength() { + return null; + } + + @Override + public PDataType getDataType() { + return null; + } + }, dataColRef.getFamily(), dataArrayPos); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + expression.evaluate(new ValueGetterTuple(valueGetter), ptr); + byte[] value = ptr.copyBytesIfNecessary(); + if (value != null) { + int indexArrayPos = getEncodedColumnQualifier(indexColRef.getQualifier()); + colValues[indexArrayPos] = value; + } + } + + List<Expression> children = Lists.newArrayListWithExpectedSize(colRefPairs.size()); + // create an expression list with all the columns + for (int j=0; j<colValues.length; ++j) { + children.add(new LiteralExpression(colValues[j]==null ? ByteUtil.EMPTY_BYTE_ARRAY : colValues[j] )); + } + // we use ArrayConstructorExpression to serialize multiple columns into a single byte[] + // construct the ArrayConstructorExpression with a variable length data type (PVarchar) since columns can be of fixed or variable length + ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarchar.INSTANCE, rowKeyOrderOptimizable); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + arrayExpression.evaluate(new BaseTuple() {}, ptr); if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - if(this.isLocalIndex) { - ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()); - put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value)); - } else { - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, colFamilyPtr, ts, ptr)); + } + } + else { + for (ColumnReference ref : this.getCoveredColumns()) { + //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); + if (value != null) { + if (put == null) { + put = new Put(indexRowKey); + put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); + } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } } @@ -973,14 +1118,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (ColumnReference ref : getCoveredColumns()) { byte[] family = ref.getFamily(); - if (this.isLocalIndex) { - family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get(); - } + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete.deleteFamilyVersion(family, ts); + delete.deleteFamilyVersion(indexColumn.getFamily(), ts); } else { - delete.deleteFamily(family, ts); + delete.deleteFamily(indexColumn.getFamily(), ts); } } if (deleteType == DeleteType.SINGLE_VERSION) { @@ -1001,12 +1144,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily(); + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + //FIXME: samarth change this. Index column qualifiers are not derivable from data table cqs. + // Figure out a backward compatible way of going this since coveredColumnsMap won't be available + // for older clients. + delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { - delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } } } @@ -1061,15 +1207,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); - dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { - byte[] cf = Bytes.readByteArray(input); - byte[] cq = Bytes.readByteArray(input); - ColumnReference ref = new ColumnReference(cf,cq); - coveredColumns.add(ref); - if(isLocalIndex) { - dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))))); - } + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + byte[] indexTableCf = Bytes.readByteArray(input); + byte[] indexTableCq = Bytes.readByteArray(input); + ColumnReference dataColumn = new ColumnReference(dataTableCf, dataTableCq); + coveredColumns.add(dataColumn); + ColumnReference indexColumn = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataColumn, indexColumn); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1095,6 +1242,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (isNewClient) { int numIndexedExpressions = WritableUtils.readVInt(input); + usesEncodedColumnNames = numIndexedExpressions > 0; + numIndexedExpressions = Math.abs(numIndexedExpressions) - 1; indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); @@ -1148,6 +1297,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedEstimatedIndexRowKeyBytesAndImmutableRows = WritableUtils.readVInt(input); this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows < 0; this.estimatedIndexRowKeyBytes = Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows); + int numCols = WritableUtils.readVInt(input); + //TODO: samarth figure out a backward compatible way of reading/writing indexedColumnsInfo + indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); + for (int i = 1; i <= numCols; i++) { + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + } + coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); + int numCoveredCols = WritableUtils.readVInt(input); + for (int i = 1; i <= numCoveredCols; i++) { + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + } + storeColsInSingleCell = WritableUtils.readVInt(input) > 0; initCachedState(); } @@ -1171,9 +1336,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Encode coveredColumns.size() and whether or not this is a local index WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (ColumnReference ref : coveredColumns) { - Bytes.writeByteArray(output, ref.getFamily()); - Bytes.writeByteArray(output, ref.getQualifier()); + for (Entry<ColumnReference, ColumnReference> ref : coveredColumnsMap.entrySet()) { + ColumnReference dataColumn = ref.getKey(); + ColumnReference indexColumn = ref.getValue(); + Bytes.writeByteArray(output, dataColumn.getFamily()); + Bytes.writeByteArray(output, dataColumn.getQualifier()); + Bytes.writeByteArray(output, indexColumn.getFamily()); + Bytes.writeByteArray(output, indexColumn.getQualifier()); } // TODO: remove when rowKeyOrderOptimizable hack no longer needed WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1)); @@ -1184,7 +1353,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength()); output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength()); - WritableUtils.writeVInt(output, indexedExpressions.size()); + // Hack to encode usesEncodedColumnNames in indexedExpressions size. + int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1); + WritableUtils.writeVInt(output, indexedExpressionsSize); for (Expression expression : indexedExpressions) { WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); expression.write(output); @@ -1195,6 +1366,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1)); // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); + WritableUtils.writeVInt(output, indexedColumnsInfo.size()); + for (Pair<String, String> colInfo : indexedColumnsInfo) { + Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); + Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); + } + WritableUtils.writeVInt(output, coveredColumnsInfo.size()); + for (Pair<String, String> colInfo : coveredColumnsInfo) { + Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); + Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); + } + WritableUtils.writeVInt(output, storeColsInSingleCell ? 1 : -1); } public int getEstimatedByteSize() { @@ -1241,16 +1423,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - dataEmptyKeyValueRef = - new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), - QueryConstants.EMPTY_COLUMN_BYTES); - - indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { - indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName( - ref.getFamily(), ref.getQualifier()))); - } - + byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier); this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); @@ -1258,7 +1433,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) { + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { indexedColumnTypes.add(expression.getDataType()); } return null; @@ -1523,4 +1698,26 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return udfParseNodes; } } + + public byte[] getEmptyKeyValueQualifier() { + return emptyKeyValueQualifierPtr.copyBytes(); + } + + public Set<Pair<String, String>> getCoveredColumnInfo() { + return coveredColumnsInfo; + } + + public Set<Pair<String, String>> getIndexedColumnInfo() { + return indexedColumnsInfo; + } + + public StorageScheme getIndexStorageScheme() { + if (storeColsInSingleCell) { + return StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; + } + if (usesEncodedColumnNames) { + return StorageScheme.ENCODED_COLUMN_NAMES; + } + return StorageScheme.NON_ENCODED_COLUMN_NAMES; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index bf1d0fb..05211c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -166,7 +166,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - get.addColumn(expression.getColumnFamily(), expression.getColumnName()); + get.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); estimatedSizeHolder[0]++; return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index eb73d6b..d382005 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -158,12 +158,13 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration())); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); - if (blockWriteRebuildIndex) + if (blockWriteRebuildIndex) { put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.ACTIVE.getSerializedBytes()); - else + } else { put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, PIndexState.DISABLE.getSerializedBytes()); + } put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(minTimeStamp)); final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put); http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index c67da6e..9ee5ea7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; @@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { for (ColumnReference ref : mutableColumns) { scan.addColumn(ref.getFamily(), ref.getQualifier()); } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier(); + // Project empty key value column - scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); @@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { if (scanner != null) { Result result; - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0) + .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); @@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // to generate point delete markers for all index rows that were added. We don't have Tephra // manage index rows in change sets because we don't want to be hit with the additional // memory hit and do not need to do conflict detection on index rows. - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index c0f9707..cdd9cbc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,12 +17,18 @@ */ package org.apache.phoenix.iterate; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; +import static org.apache.phoenix.schema.PTable.IndexType.LOCAL; +import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.util.EncodedColumnsUtil.getEncodedColumnQualifier; +import static org.apache.phoenix.util.ScanUtil.setQualifierRanges; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -78,20 +84,25 @@ import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PrefixByteCodec; import org.apache.phoenix.util.PrefixByteDecoder; @@ -159,7 +170,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return true; } - private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) { + private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException { StatementContext context = plan.getContext(); TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); @@ -210,7 +221,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Project empty key value unless the column family containing it has // been projected in its entirety. if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst()); } } } @@ -228,7 +239,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if(offset!=null){ ScanUtil.addOffsetAttribute(scan, offset); } - int cols = plan.getGroupBy().getOrderPreservingColumnCount(); if (cols > 0 && keyOnlyFilter && !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) && @@ -243,13 +253,86 @@ public abstract class BaseResultIterators extends ExplainTable implements Result ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit())); } } - + // When analyzing the table, there is no look up for key values being done. + // So there is no point setting the range. + if (setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) { + Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context); + if (range != null) { + scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, getEncodedColumnQualifier(range.getFirst())); + scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, getEncodedColumnQualifier(range.getSecond())); + } + } if (optimizeProjection) { optimizeProjection(context, scan, table, statement); } } } + + private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context) + throws SQLException { + PTable table = context.getCurrentTable().getTable(); + StorageScheme storageScheme = table.getStorageScheme(); + checkArgument(storageScheme == StorageScheme.ENCODED_COLUMN_NAMES, + "Method should only be used for tables using encoded column names"); + Pair<Integer, Integer> minMaxQualifiers = new Pair<>(); + for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { + byte[] cq = whereCol.getSecond(); + if (cq != null) { + int qualifier = getEncodedColumnQualifier(cq); + determineQualifierRange(qualifier, minMaxQualifiers); + } + } + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + + Map<String, Pair<Integer, Integer>> qualifierRanges = SchemaUtil.getQualifierRanges(table); + for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + if (entry.getValue() != null) { + for (byte[] cq : entry.getValue()) { + if (cq != null) { + int qualifier = getEncodedColumnQualifier(cq); + determineQualifierRange(qualifier, minMaxQualifiers); + } + } + } else { + /* + * All the columns of the column family are being projected. So we will need to + * consider all the columns in the column family to determine the min-max range. + */ + String family = Bytes.toString(entry.getKey()); + if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) { + //TODO: samarth confirm with James why do we need this hack here :( + family = IndexUtil.getLocalIndexColumnFamily(family); + } + Pair<Integer, Integer> range = qualifierRanges.get(family); + determineQualifierRange(range.getFirst(), minMaxQualifiers); + determineQualifierRange(range.getSecond(), minMaxQualifiers); + } + } + if (minMaxQualifiers.getFirst() == null) { + return null; + } + return minMaxQualifiers; + } + /** + * + * @param cq + * @param minMaxQualifiers + * @return true if the empty column was projected + */ + private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) { + if (minMaxQualifiers.getFirst() == null) { + minMaxQualifiers.setFirst(qualifier); + minMaxQualifiers.setSecond(qualifier); + } else { + if (minMaxQualifiers.getFirst() > qualifier) { + minMaxQualifiers.setFirst(qualifier); + } else if (minMaxQualifiers.getSecond() < qualifier) { + minMaxQualifiers.setSecond(qualifier); + } + } + } + private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); // columnsTracker contain cf -> qualifiers which should get returned. @@ -346,7 +429,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs)); + columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme()))); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index 3293f65..1e5f09e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator { }; } - private final static Tuple UNINITIALIZED = new ResultTuple(); + private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE; private Tuple next = UNINITIALIZED; abstract protected Tuple advance() throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java index 8ada952..135ab26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java @@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { return this.index; } + @Override public int size() { if (flushBuffer) return flushedCount; http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 8dcb2e8..e4c52c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Function; @@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator { } this.byteSize = queueEntries.getByteSize(); } catch (IOException e) { - throw new SQLException("", e); + ServerUtil.createIOException(e.getMessage(), e); } finally { delegate.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 88e141a..816b78c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -24,16 +24,24 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; - public RegionScannerResultIterator(RegionScanner scanner) { + public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers) { this.scanner = scanner; + this.useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers); + this.minMaxQualifiers = minMaxQualifiers; } @Override @@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { synchronized (scanner) { try { // TODO: size - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned @@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { } // We instantiate a new tuple because in all cases currently we hang on to it // (i.e. to compute and hold onto the TopN). - MultiKeyValueTuple tuple = new MultiKeyValueTuple(); + Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); tuple.setKeyValues(results); return tuple; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index b4f2adc..bc81447 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT); public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP"; public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP); - + public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY; public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; @@ -320,6 +320,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { /** Version below which we fall back on the generic KeyValueBuilder */ public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); + public static final String STORAGE_SCHEME = "STORAGE_SCHEME"; + public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME); + public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER"; + public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(ENCODED_COLUMN_QUALIFIER); + public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER"; + public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER); + PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; @@ -593,9 +600,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { newCells.addAll(cells); newCells.add(kv); Collections.sort(newCells, KeyValue.COMPARATOR); - resultTuple.setResult(Result.create(newCells)); + tuple = new ResultTuple(Result.create(newCells)); } - return tuple; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 47c17ae..3ca48a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private final static String STRING_FALSE = "0"; private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0); private final static Integer INTEGER_FALSE = Integer.valueOf(0); - private final static Tuple BEFORE_FIRST = new ResultTuple(); + private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE; private final ResultIterator scanner; private final RowProjector rowProjector; http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 908a117..2d7550a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory { int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset); offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]); ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize); + //TODO: samarth make joins work with position look up. Tuple result = new ResultTuple(ResultUtil.toResult(value)); ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions); List<Tuple> tuples = hashCacheMap.get(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/670b53a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index b12326a..a6a57c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri not care about it */ private void initColumnIndexes() throws SQLException { - columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR); + columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); int columnIndex = 0; for(int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); @@ -216,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) // Skip PK column + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + cq = EncodedColumnsUtil.getColumnQualifier(c, table); + } else { + // TODO: samarth verify if this is the right thing to do here. + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if (!columnIndexes.containsKey(cfn)) { columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, - QueryConstants.EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } @@ -243,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri private int findIndex(Cell cell) throws IOException { byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if(columnIndexes.containsKey(cfn)) { return columnIndexes.get(cfn); }