Repository: phoenix Updated Branches: refs/heads/encodecolumns2 a4b58c070 -> 42d049275
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 3c45295..7b0451a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -371,7 +371,7 @@ public abstract class BaseQueryPlan implements QueryPlan { } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection()); - scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr)); if (dataTable.isTransactional()) { scan.setAttribute(BaseScannerRegionObserver.TX_STATE, context.getConnection().getMutationState().encodeTransaction()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index a030150..4da26c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -1191,7 +1191,7 @@ public class MutationState implements SQLCloseable { } mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); if (txState.length > 0) { mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 99b6bf8..d3a3ca4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -17,7 +17,10 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; @@ -29,6 +32,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Writable; @@ -52,6 +57,8 @@ import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos; +import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ColumnInfo; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; @@ -116,10 +123,10 @@ import com.google.common.collect.Sets; * row and caches any covered columns. Client-side serializes into byte array using * @link #serialize(PTable, ImmutableBytesWritable)} * and transmits to server-side through either the - * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_MD} + * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD} * Mutation attribute or as a separate RPC call using * {@link org.apache.phoenix.cache.ServerCacheClient}) - * + * * * @since 2.1.0 */ @@ -127,8 +134,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private static final int EXPRESSION_NOT_PRESENT = -1; private static final int ESTIMATED_EXPRESSION_SIZE = 8; - - public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { + + public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) { if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) { throw new IllegalArgumentException(); } @@ -190,14 +197,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } } int nIndexes = 0; - int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() + 2; while (indexesItr.hasNext()) { nIndexes++; - PTable index = indexesItr.next(); - estimatedSize += index.getIndexMaintainer(dataTable, connection).getEstimatedByteSize(); + indexesItr.next(); } - TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1); - DataOutput output = new DataOutputStream(stream); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(stream); try { // Encode data table salting in sign of number of indexes WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); @@ -207,15 +212,23 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator()) : nonDisabledIndexIterator(indexes.iterator()); while (indexesItr.hasNext()) { - indexesItr.next().getIndexMaintainer(dataTable, connection).write(output); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); + byte[] protoBytes = proto.toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible } - ptr.set(stream.getBuffer(), 0, stream.size()); + ptr.set(stream.toByteArray(), 0, stream.size()); } - + /** + * For client-side to append serialized IndexMaintainers of keyValueIndexes + * @param dataTable data table + * @param indexMetaDataPtr bytes pointer to hold returned serialized value + * @param keyValueIndexes indexes to serialize + */ public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, List<PTable> keyValueIndexes, PhoenixConnection connection) { int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); @@ -241,7 +254,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Serialize mutable indexes afterwards for (PTable index : keyValueIndexes) { - index.getIndexMaintainer(table, connection).write(output); + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + byte[] protoBytes = IndexMaintainer.toProto(maintainer).toByteArray(); + WritableUtils.writeVInt(output, protoBytes.length); + output.write(protoBytes); } } catch (IOException e) { throw new RuntimeException(e); // Impossible @@ -250,15 +266,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr, - KeyValueBuilder builder) { - return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength()); + KeyValueBuilder builder, boolean useProtoForIndexMaintainer) { + return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength(), useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf) { - return deserialize(buf, 0, buf.length); + public static List<IndexMaintainer> deserialize(byte[] buf, boolean useProtoForIndexMaintainer) { + return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer); } - public static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) { + private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); @@ -270,31 +286,31 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { rowKeySchema.readFields(input); maintainers = Lists.newArrayListWithExpectedSize(size); for (int i = 0; i < size; i++) { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } } catch (IOException e) { throw new RuntimeException(e); // Impossible } return maintainers; } - + private byte[] viewIndexId; private boolean isMultiTenant; // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column private List<Expression> indexedExpressions; // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; - private Set<ColumnReference> coveredColumns; - // Information for columns of data tables that are being indexed. The first part of the pair is column family and second part is the column name. - private Set<Pair<String, String>> indexedColumnsInfo; - // Information for columns of data tables that are being covered by the index. The first part of the pair is column family and second part is the column name. - private Set<Pair<String, String>> coveredColumnsInfo; - // Map of covered columns where a key is column reference for a column in the data table - // and value is column reference for corresponding column in the index table. - // TODO: samarth confirm that we don't need a separate map for tracking column families of local indexes. - private Map<ColumnReference, ColumnReference> coveredColumnsMap; + // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -308,7 +324,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; - // Transient state private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; @@ -319,15 +334,30 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; - private ImmutableBytesPtr emptyKeyValueQualifierPtr; + + /**** START: New member variables added in 4.10 *****/ private QualifierEncodingScheme encodingScheme; private ImmutableStorageScheme immutableStorageScheme; - + /* + * Information for columns of data tables that are being indexed. The first part of the pair is column family name + * and second part is the column name. The reason we need to track this state is because for certain storage schemes + * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an index + * table put/delete is different from the columns that are indexed in the phoenix schema. This information helps us + * determine whether or not certain operations like DROP COLUMN should impact the index. + */ + private Set<Pair<String, String>> indexedColumnsInfo; + /* + * Map of covered columns where a key is column reference for a column in the data table + * and value is column reference for corresponding column in the index table. + */ + private Map<ColumnReference, ColumnReference> coveredColumnsMap; + /**** END: New member variables added in 4.10 *****/ + private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; this.isDataTableSalted = isDataTableSalted; } - + private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); @@ -336,7 +366,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; this.encodingScheme = index.getEncodingScheme(); - this.immutableStorageScheme = index.getImmutableStorageScheme(); + + // null check for b/w compatibility + this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme(); + this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme(); + byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. @@ -385,7 +419,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexTableName = indexTableName; this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); - this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); @@ -417,7 +450,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver); this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); - this.coveredColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns); IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context); for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) { @@ -508,10 +540,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); byte[] dataColumnCq = dataColumn.getColumnQualifierBytes(); byte[] indexColumnCq = indexColumn.getColumnQualifierBytes(); - this.coveredColumns.add(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq)); this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); - this.coveredColumnsInfo.add(new Pair<>(dataColumn.getFamilyName().getString(), dataColumn.getName().getString())); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); @@ -918,27 +948,27 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return indexRowKeySchema; } - + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - Put put = null; + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); + Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts, + this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, // set the value to the empty column name - emptyKeyValueQualifierPtr)); + dataEmptyKeyValueRef.getQualifierWritable())); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (immutableStorageScheme != immutableStorageScheme.ONE_CELL_PER_COLUMN) { + if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { // map from index column family to list of pair of index column and data column (for covered columns) Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap(); for (ColumnReference ref : this.getCoveredColumns()) { - ColumnReference indexColRef = this.coveredColumnsMap.get(ref); - ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily()); + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily()); if (!familyToColListMap.containsKey(cf)) { familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList()); } @@ -956,39 +986,39 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier); // set the values of the columns for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) { - ColumnReference indexColRef = colRefPair.getFirst(); - ColumnReference dataColRef = colRefPair.getSecond(); - Expression expression = new SingleCellColumnExpression(new PDatum() { - @Override - public boolean isNullable() { - return false; - } - - @Override - public SortOrder getSortOrder() { - return null; - } - - @Override - public Integer getScale() { - return null; - } - - @Override - public Integer getMaxLength() { - return null; - } - - @Override - public PDataType getDataType() { - return null; - } - }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme); - ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + ColumnReference indexColRef = colRefPair.getFirst(); + ColumnReference dataColRef = colRefPair.getSecond(); + Expression expression = new SingleCellColumnExpression(new PDatum() { + @Override + public boolean isNullable() { + return false; + } + + @Override + public SortOrder getSortOrder() { + return null; + } + + @Override + public Integer getScale() { + return null; + } + + @Override + public Integer getMaxLength() { + return null; + } + + @Override + public PDataType getDataType() { + return null; + } + }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme); + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); expression.evaluate(new ValueGetterTuple(valueGetter), ptr); byte[] value = ptr.copyBytesIfNecessary(); - if (value != null) { - int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; + if (value != null) { + int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1; colValues[indexArrayPos] = new LiteralExpression(value); } } @@ -1006,20 +1036,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr)); } - } - else { - for (ColumnReference ref : this.getCoveredColumns()) { - //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. + } else { + for (ColumnReference ref : this.getCoveredColumns()) { ColumnReference indexColRef = this.coveredColumnsMap.get(ref); ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); ImmutableBytesWritable value = valueGetter.getLatestValue(ref); if (value != null) { - if (put == null) { + if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } } @@ -1097,7 +1125,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null); } - @SuppressWarnings("deprecation") public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed @@ -1107,7 +1134,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Delete delete = new Delete(indexRowKey); for (ColumnReference ref : getCoveredColumns()) { - byte[] family = ref.getFamily(); ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { @@ -1125,11 +1151,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return delete; } Delete delete = null; + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); // Delete columns for missing key values for (Cell kv : pendingUpdates) { if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) { ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier()); - if (coveredColumns.contains(ref)) { + if (dataTableColRefs.contains(ref)) { if (delete == null) { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); @@ -1137,9 +1164,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - //FIXME: samarth change this. Index column qualifiers are not derivable from data table cqs. - // Figure out a backward compatible way of going this since coveredColumnsMap won't be available - // for older clients. delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); @@ -1149,13 +1173,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return delete; } - + public byte[] getIndexTableName() { return indexTableName; } public Set<ColumnReference> getCoveredColumns() { - return coveredColumns; + return coveredColumnsMap.keySet(); } public Set<ColumnReference> getAllColumns() { @@ -1168,7 +1192,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // If if there are no covered columns, we know it's our default name return emptyKeyValueCFPtr; } - + + @Deprecated // Only called by code older than our 4.10 release @Override public void readFields(DataInput input) throws IOException { int encodedIndexSaltBucketsAndMultiTenant = WritableUtils.readVInt(input); @@ -1196,17 +1221,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedCoveredolumnsAndLocalIndex = WritableUtils.readVInt(input); isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; - coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { byte[] dataTableCf = Bytes.readByteArray(input); byte[] dataTableCq = Bytes.readByteArray(input); - byte[] indexTableCf = Bytes.readByteArray(input); - byte[] indexTableCq = Bytes.readByteArray(input); - ColumnReference dataColumn = new ColumnReference(dataTableCf, dataTableCq); - coveredColumns.add(dataColumn); - ColumnReference indexColumn = new ColumnReference(indexTableCf, indexTableCq); - coveredColumnsMap.put(dataColumn, indexColumn); + ColumnReference dataTableRef = new ColumnReference(dataTableCf, dataTableCq); + byte[] indexTableCf = isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableCf) : dataTableCf; + byte[] indexTableCq = IndexUtil.getIndexColumnName(dataTableCf, dataTableCq); + ColumnReference indexTableRef = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataTableRef, indexTableRef); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1234,9 +1257,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int numIndexedExpressions = WritableUtils.readVInt(input); indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { - Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); - expression.readFields(input); - indexedExpressions.add(expression); + Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + expression.readFields(input); + indexedExpressions.add(expression); } } else { @@ -1285,26 +1308,82 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { int encodedEstimatedIndexRowKeyBytesAndImmutableRows = WritableUtils.readVInt(input); this.immutableRows = encodedEstimatedIndexRowKeyBytesAndImmutableRows < 0; this.estimatedIndexRowKeyBytes = Math.abs(encodedEstimatedIndexRowKeyBytesAndImmutableRows); - int numCols = WritableUtils.readVInt(input); - //TODO: samarth figure out a backward compatible way of reading/writing indexedColumnsInfo - indexedColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); - for (int i = 1; i <= numCols; i++) { - byte[] dataTableCf = Bytes.readByteArray(input); - byte[] dataTableCq = Bytes.readByteArray(input); - indexedColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + initCachedState(); + } + + + public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException { + IndexMaintainer maintainer = new IndexMaintainer(dataTableRowKeySchema, isDataTableSalted); + maintainer.nIndexSaltBuckets = proto.getSaltBuckets(); + maintainer.isMultiTenant = proto.getIsMultiTenant(); + maintainer.viewIndexId = proto.hasViewIndexId() ? proto.getViewIndexId().toByteArray() : null; + List<ServerCachingProtos.ColumnReference> indexedColumnsList = proto.getIndexedColumnsList(); + maintainer.indexedColumns = new HashSet<ColumnReference>(indexedColumnsList.size()); + for (ServerCachingProtos.ColumnReference colRefFromProto : indexedColumnsList) { + maintainer.indexedColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray())); + } + List<Integer> indexedColumnTypes = proto.getIndexedColumnTypeOrdinalList(); + maintainer.indexedColumnTypes = new ArrayList<PDataType>(indexedColumnTypes.size()); + for (Integer typeOrdinal : indexedColumnTypes) { + maintainer.indexedColumnTypes.add(PDataType.values()[typeOrdinal]); + } + maintainer.indexTableName = proto.getIndexTableName().toByteArray(); + maintainer.rowKeyOrderOptimizable = proto.getRowKeyOrderOptimizable(); + maintainer.dataEmptyKeyValueCF = proto.getDataTableEmptyKeyValueColFamily().toByteArray(); + ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily(); + maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength()); + maintainer.indexedExpressions = new ArrayList<>(); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getIndexedExpressions().toByteArray())) { + DataInput input = new DataInputStream(stream); + while (stream.available() > 0) { + int expressionOrdinal = WritableUtils.readVInt(input); + Expression expression = ExpressionType.values()[expressionOrdinal].newInstance(); + expression.readFields(input); + maintainer.indexedExpressions.add(expression); + } } - coveredColumnsInfo = Sets.newHashSetWithExpectedSize(numCols); - int numCoveredCols = WritableUtils.readVInt(input); - for (int i = 1; i <= numCoveredCols; i++) { - byte[] dataTableCf = Bytes.readByteArray(input); - byte[] dataTableCq = Bytes.readByteArray(input); - coveredColumnsInfo.add(new Pair<>(Bytes.toString(dataTableCf), Bytes.toString(dataTableCq))); + maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, maintainer.isMultiTenant); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) { + DataInput input = new DataInputStream(stream); + maintainer.rowKeyMetaData.readFields(input); + } + maintainer.nDataCFs = proto.getNumDataTableColFamilies(); + maintainer.indexWALDisabled = proto.getIndexWalDisabled(); + maintainer.estimatedIndexRowKeyBytes = proto.getIndexRowKeyByteSize(); + maintainer.immutableRows = proto.getImmutable(); + List<ColumnInfo> indexedColumnInfoList = proto.getIndexedColumnInfoList(); + maintainer.indexedColumnsInfo = Sets.newHashSet(); + for (ColumnInfo info : indexedColumnInfoList) { + maintainer.indexedColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName())); + } + // proto doesn't support single byte so need an explicit cast here + maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme()); + maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme()); + maintainer.isLocalIndex = proto.getIsLocalIndex(); + + List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList(); + List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList(); + maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size()); + boolean encodedColumnNames = maintainer.encodingScheme != NON_ENCODED_QUALIFIERS; + Iterator<ServerCachingProtos.ColumnReference> indexTableColRefItr = indexTableColRefsForCoveredColumnsList.iterator(); + for (ServerCachingProtos.ColumnReference colRefFromProto : dataTableColRefsForCoveredColumnsList) { + ColumnReference dataTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier( ).toByteArray()); + ColumnReference indexTableColRef; + if (encodedColumnNames) { + ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); + indexTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( ).toByteArray()); + } else { + byte[] cq = IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), dataTableColRef.getQualifier()); + byte[] cf = maintainer.isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableColRef.getFamily()) : dataTableColRef.getFamily(); + indexTableColRef = new ColumnReference(cf, cq); + } + maintainer.coveredColumnsMap.put(dataTableColRef, indexTableColRef); } - encodingScheme = WritableUtils.readEnum(input, QualifierEncodingScheme.class); - immutableStorageScheme = WritableUtils.readEnum(input, ImmutableStorageScheme.class); - initCachedState(); + maintainer.initCachedState(); + return maintainer; } + @Deprecated // Only called by code older than our 4.10 release @Override public void write(DataOutput output) throws IOException { // Encode nIndexSaltBuckets and isMultiTenant together @@ -1324,14 +1403,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, type.ordinal()); } // Encode coveredColumns.size() and whether or not this is a local index - WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (Entry<ColumnReference, ColumnReference> ref : coveredColumnsMap.entrySet()) { - ColumnReference dataColumn = ref.getKey(); - ColumnReference indexColumn = ref.getValue(); - Bytes.writeByteArray(output, dataColumn.getFamily()); - Bytes.writeByteArray(output, dataColumn.getQualifier()); - Bytes.writeByteArray(output, indexColumn.getFamily()); - Bytes.writeByteArray(output, indexColumn.getQualifier()); + WritableUtils.writeVInt(output, (coveredColumnsMap.size() + 1) * (isLocalIndex ? -1 : 1)); + for (ColumnReference ref : coveredColumnsMap.keySet()) { + Bytes.writeByteArray(output, ref.getFamily()); + Bytes.writeByteArray(output, ref.getQualifier()); } // TODO: remove when rowKeyOrderOptimizable hack no longer needed WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1)); @@ -1341,10 +1416,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // when indexedColumnTypes is removed, remove this WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength()); output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength()); + WritableUtils.writeVInt(output, indexedExpressions.size()); for (Expression expression : indexedExpressions) { - WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); - expression.write(output); + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); } rowKeyMetaData.write(output); @@ -1352,18 +1428,76 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, (nDataCFs + 1) * (indexWALDisabled ? -1 : 1)); // Encode estimatedIndexRowKeyBytes and immutableRows together. WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1)); - WritableUtils.writeVInt(output, indexedColumnsInfo.size()); - for (Pair<String, String> colInfo : indexedColumnsInfo) { - Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); - Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); - } - WritableUtils.writeVInt(output, coveredColumnsInfo.size()); - for (Pair<String, String> colInfo : coveredColumnsInfo) { - Bytes.writeByteArray(output, colInfo.getFirst() == null ? null : colInfo.getFirst().getBytes()); - Bytes.writeByteArray(output, colInfo.getSecond().getBytes()); - } - WritableUtils.writeEnum(output, encodingScheme); - WritableUtils.writeEnum(output, immutableStorageScheme); + } + + public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer maintainer) throws IOException { + ServerCachingProtos.IndexMaintainer.Builder builder = ServerCachingProtos.IndexMaintainer.newBuilder(); + builder.setSaltBuckets(maintainer.nIndexSaltBuckets); + builder.setIsMultiTenant(maintainer.isMultiTenant); + if (maintainer.viewIndexId != null) { + builder.setViewIndexId(ByteStringer.wrap(maintainer.viewIndexId)); + } + for (ColumnReference colRef : maintainer.indexedColumns) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier())); + builder.addIndexedColumns(cRefBuilder.build()); + } + for (PDataType dataType : maintainer.indexedColumnTypes) { + builder.addIndexedColumnTypeOrdinal(dataType.ordinal()); + } + for (Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) { + ServerCachingProtos.ColumnReference.Builder cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + ColumnReference dataTableColRef = e.getKey(); + cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier())); + builder.addDataTableColRefForCoveredColumns(cRefBuilder.build()); + if (maintainer.encodingScheme != NON_ENCODED_QUALIFIERS) { + // We need to serialize the colRefs of index tables only in case of encoded column names. + ColumnReference indexTableColRef = e.getValue(); + cRefBuilder = ServerCachingProtos.ColumnReference.newBuilder(); + cRefBuilder.setFamily(ByteStringer.wrap(indexTableColRef.getFamily())); + cRefBuilder.setQualifier(ByteStringer.wrap(indexTableColRef.getQualifier())); + builder.addIndexTableColRefForCoveredColumns(cRefBuilder.build()); + } + } + builder.setIsLocalIndex(maintainer.isLocalIndex); + builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName)); + builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable); + builder.setDataTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.dataEmptyKeyValueCF)); + ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder(); + ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get())); + ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength()); + ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset()); + builder.setEmptyKeyValueColFamily(ibwBuilder.build()); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + for (Expression expression : maintainer.indexedExpressions) { + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); + } + builder.setIndexedExpressions(ByteStringer.wrap(stream.toByteArray())); + } + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + DataOutput output = new DataOutputStream(stream); + maintainer.rowKeyMetaData.write(output); + builder.setRowKeyMetadata(ByteStringer.wrap(stream.toByteArray())); + } + builder.setNumDataTableColFamilies(maintainer.nDataCFs); + builder.setIndexWalDisabled(maintainer.indexWALDisabled); + builder.setIndexRowKeyByteSize(maintainer.estimatedIndexRowKeyBytes); + builder.setImmutable(maintainer.immutableRows); + for (Pair<String, String> p : maintainer.indexedColumnsInfo) { + ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder(); + if (p.getFirst() != null) { + ciBuilder.setFamilyName(p.getFirst()); + } + ciBuilder.setColumnName(p.getSecond()); + builder.addIndexedColumnInfo(ciBuilder.build()); + } + builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue()); + builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue()); + return builder.build(); } public int getEstimatedByteSize() { @@ -1381,8 +1515,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { PDataType type = indexedColumnTypes.get(i); size += WritableUtils.getVIntSize(type.ordinal()); } - size += WritableUtils.getVIntSize(coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { + Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet(); + size += WritableUtils.getVIntSize(dataTableColRefs.size()); + for (ColumnReference ref : dataTableColRefs) { size += WritableUtils.getVIntSize(ref.getFamilyWritable().getSize()); size += ref.getFamily().length; size += WritableUtils.getVIntSize(ref.getQualifierWritable().getSize()); @@ -1412,8 +1547,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private void initCachedState() { byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); - emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier); - this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); + this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); for (Expression expression : indexedExpressions) { @@ -1429,7 +1563,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { expression.accept(visitor); } allColumns.addAll(indexedColumns); - allColumns.addAll(coveredColumns); + allColumns.addAll(coveredColumnsMap.keySet()); int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0); int nIndexPkColumns = getIndexPkColumnCount(); @@ -1473,12 +1607,21 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private int getIndexPkColumnCount() { - return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); + return getIndexPkColumnCount(dataRowKeySchema, indexedExpressions.size(), isDataTableSalted, isMultiTenant); + } + + private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + return rowKeySchema.getFieldCount() + numIndexExpressions - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0); } private RowKeyMetaData newRowKeyMetaData() { return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : new IntSizedRowKeyMetaData(); } + + private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) { + int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, numIndexExpressions, isDataTableSalted, isMultiTenant); + return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : i.new IntSizedRowKeyMetaData(); + } private RowKeyMetaData newRowKeyMetaData(int capacity) { return capacity < 0xFF ? new ByteSizeRowKeyMetaData(capacity) : new IntSizedRowKeyMetaData(capacity); @@ -1687,11 +1830,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public byte[] getEmptyKeyValueQualifier() { - return emptyKeyValueQualifierPtr.copyBytes(); - } - - public Set<Pair<String, String>> getCoveredColumnInfo() { - return coveredColumnsInfo; + return dataEmptyKeyValueRef.getQualifier(); } public Set<Pair<String, String>> getIndexedColumnInfo() { @@ -1701,4 +1840,5 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public ImmutableStorageScheme getIndexStorageScheme() { return immutableStorageScheme; } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 05a01b9..fcabdfd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -93,4 +93,5 @@ public class IndexMetaDataCacheClient { */ return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 56849fe..9edcafc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -47,10 +47,10 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast final List<IndexMaintainer> maintainers = - IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); + IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); final Transaction txn; try { txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 9d2955b..4116101 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -38,6 +38,7 @@ import com.google.common.collect.Lists; */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; + public static final String INDEX_PROTO_MD = "IdxProtoMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index d22e957..39473dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -47,10 +47,15 @@ public class PhoenixIndexMetaData implements IndexMetaData { if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } - byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + boolean useProto = false; + byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD); + useProto = md != null; + if (md == null) { + md = attributes.get(PhoenixIndexCodec.INDEX_MD); + } byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { - final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); final Transaction txn = MutationState.decodeTransaction(txState); return new IndexMetaDataCache() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 2d7550a..1985c2e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -69,7 +69,7 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException { + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { try { // This reads the uncompressed length from the front of the compressed input int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 81c3e75..1de57cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -156,6 +156,7 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -2178,7 +2179,7 @@ public class MetaDataClient { Integer encodedCQ = isPkColumn ? null : cqCounter.getNextQualifier(cqCounterFamily); byte[] columnQualifierBytes = null; try { - columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), encodedCQ, encodingScheme); + columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(columnDefName.getColumnName(), encodedCQ, encodingScheme, isPkColumn); } catch (QualifierOutOfRangeException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED) @@ -3268,7 +3269,7 @@ public class MetaDataClient { } byte[] columnQualifierBytes = null; try { - columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table); + columnQualifierBytes = EncodedColumnsUtil.getColumnQualifierBytes(colDef.getColumnDefName().getColumnName(), encodedCQ, table, colDef.isPK()); } catch (QualifierOutOfRangeException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_COLUMNS_EXCEEDED) @@ -3604,9 +3605,11 @@ public class MetaDataClient { // get the covered columns List<PColumn> indexColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size()); Set<Pair<String, String>> indexedColsInfo = indexMaintainer.getIndexedColumnInfo(); - Set<Pair<String, String>> coveredColsInfo = indexMaintainer.getCoveredColumnInfo(); + Set<ColumnReference> coveredCols = indexMaintainer.getCoveredColumns(); for(PColumn columnToDrop : tableColumnsToDrop) { Pair<String, String> columnToDropInfo = new Pair<>(columnToDrop.getFamilyName().getString(), columnToDrop.getName().getString()); + ColumnReference colDropRef = new ColumnReference(columnToDrop.getFamilyName() == null ? null + : columnToDrop.getFamilyName().getBytes(), columnToDrop.getColumnQualifierBytes()); boolean isColumnIndexed = indexedColsInfo.contains(columnToDropInfo); if (isColumnIndexed) { if (index.getViewIndexId() == null) { @@ -3614,8 +3617,7 @@ public class MetaDataClient { } connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, index.getName().getString()), index.getParentName() == null ? null : index.getParentName().getString(), index.getTimeStamp()); removedIndexTableOrColumn = true; - } - else if (coveredColsInfo.contains(columnToDropInfo)) { + } else if (coveredCols.contains(colDropRef)) { String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); PColumn indexColumn = index.getPColumnForColumnName(indexColumnName); indexColumnsToDrop.add(indexColumn); http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java index 16054b4..78baa4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Preconditions; @@ -210,6 +211,10 @@ public class PColumnImpl implements PColumn { @Override public byte[] getColumnQualifierBytes() { + // Needed for backward compatibility + if (!SchemaUtil.isPKColumn(this) && columnQualifierBytes == null) { + return this.name.getBytes(); + } return columnQualifierBytes; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java index e99003f..5a5b355 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/EncodedColumnQualiferCellsList.java @@ -361,6 +361,11 @@ public class EncodedColumnQualiferCellsList implements List<Cell> { return getCellForColumnQualifier(columnQualifier); } + public Cell getCellForColumnQualifier(byte[] qualifierBytes, int offset, int length) { + int columnQualifier = encodingScheme.decode(qualifierBytes, offset, length); + return getCellForColumnQualifier(columnQualifier); + } + private Cell getCellForColumnQualifier(int columnQualifier) { checkQualifierRange(columnQualifier); int idx = getArrayIndex(columnQualifier); http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java index ce2c98c..725161a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java @@ -66,11 +66,13 @@ public class EncodedColumnsUtil { } public static QualifierEncodingScheme getQualifierEncodingScheme(Scan s) { - return QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]); + // null check for backward compatibility + return s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME) == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME)[0]); } public static ImmutableStorageScheme getImmutableStorageScheme(Scan s) { - return ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]); + // null check for backward compatibility + return s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME) == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme.fromSerializedValue(s.getAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME)[0]); } /** @@ -157,18 +159,19 @@ public class EncodedColumnsUtil { return toReturn; } - public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, PTable table) { + public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, PTable table, boolean isPk) { QualifierEncodingScheme encodingScheme = table.getEncodingScheme(); - return getColumnQualifierBytes(columnName, numberBasedQualifier, encodingScheme); + return getColumnQualifierBytes(columnName, numberBasedQualifier, encodingScheme, isPk); } - public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, QualifierEncodingScheme encodingScheme) { - if (encodingScheme == NON_ENCODED_QUALIFIERS) { + public static byte[] getColumnQualifierBytes(String columnName, Integer numberBasedQualifier, QualifierEncodingScheme encodingScheme, boolean isPk) { + if (isPk) { + return null; + } + if (encodingScheme == null || encodingScheme == NON_ENCODED_QUALIFIERS) { return Bytes.toBytes(columnName); - } else if (numberBasedQualifier != null) { - return encodingScheme.encode(numberBasedQualifier); } - return null; + return encodingScheme.encode(numberBasedQualifier); } public static Expression[] createColumnExpressionArray(int maxEncodedColumnQualifier) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 217c99e..9c7f9ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -181,6 +181,11 @@ public class IndexUtil { : QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX + dataColumnFamilyName; } + public static byte[] getLocalIndexColumnFamily(byte[] dataColumnFamilyBytes) { + String dataCF = Bytes.toString(dataColumnFamilyBytes); + return getLocalIndexColumnFamily(dataCF).getBytes(); + } + public static PColumn getDataColumn(PTable dataTable, String indexColumnName) { int pos = indexColumnName.indexOf(INDEX_COLUMN_NAME_SEP); if (pos < 0) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java index ac2a850..ade5239 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java @@ -47,7 +47,7 @@ public class TenantCacheTest { TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive); ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory); + newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); newTenantCache.removeServerCache(cacheId); assertEquals(maxBytes, memoryManager.getAvailableMemory()); @@ -63,7 +63,7 @@ public class TenantCacheTest { TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; cache.cleanUp(); @@ -91,7 +91,7 @@ public class TenantCacheTest { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { return chunk; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 5887e5b..0d4a52f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -109,7 +109,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { PTable index = pconn.getTable(new PTableKey(pconn.getTenantId(),fullIndexName)); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); table.getIndexMaintainers(ptr, pconn); - List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder); + List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder, true); assertEquals(1,c1.size()); IndexMaintainer im1 = c1.get(0); @@ -310,7 +310,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), "FHA")); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); table.getIndexMaintainers(ptr, pconn); - List<IndexMaintainer> indexMaintainerList = IndexMaintainer.deserialize(ptr, GenericKeyValueBuilder.INSTANCE); + List<IndexMaintainer> indexMaintainerList = IndexMaintainer.deserialize(ptr, GenericKeyValueBuilder.INSTANCE, true); assertEquals(1,indexMaintainerList.size()); IndexMaintainer indexMaintainer = indexMaintainerList.get(0); Set<ColumnReference> indexedColumns = indexMaintainer.getIndexedColumns(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/42d04927/phoenix-protocol/src/main/ServerCachingService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto index a45b18f..044c111 100644 --- a/phoenix-protocol/src/main/ServerCachingService.proto +++ b/phoenix-protocol/src/main/ServerCachingService.proto @@ -30,12 +30,47 @@ message ImmutableBytesWritable { required int32 length = 3; } +message ColumnReference { + required bytes family = 1; + required bytes qualifier = 2; +} + +message ColumnInfo { + optional string familyName = 1; + required string columnName = 2; +} + +message IndexMaintainer { + required int32 saltBuckets = 1; + required bool isMultiTenant = 2; + optional bytes viewIndexId = 3; + repeated ColumnReference indexedColumns = 4; + repeated int32 indexedColumnTypeOrdinal = 5; + repeated ColumnReference dataTableColRefForCoveredColumns = 6; + repeated ColumnReference indexTableColRefForCoveredColumns = 7; + required bool isLocalIndex = 8; + required bytes indexTableName = 9; + required bool rowKeyOrderOptimizable = 10; + required bytes dataTableEmptyKeyValueColFamily = 11; + required ImmutableBytesWritable emptyKeyValueColFamily = 12; + optional bytes indexedExpressions = 13; + required bytes rowKeyMetadata = 14; + required int32 numDataTableColFamilies = 15; + required bool indexWalDisabled = 16; + required int32 indexRowKeyByteSize = 17; + required bool immutable = 18; + repeated ColumnInfo indexedColumnInfo = 19; + required int32 encodingScheme = 20; + required int32 immutableStorageScheme = 21; +} + message AddServerCacheRequest { optional bytes tenantId = 1; required bytes cacheId = 2; required ImmutableBytesWritable cachePtr = 3; required ServerCacheFactory cacheFactory = 4; optional bytes txState = 5; + optional bool hasProtoBufIndexMaintainer = 6; } message AddServerCacheResponse {
