http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 0956753..a12f633 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -59,14 +59,10 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.parse.AndParseNode; -import org.apache.phoenix.parse.BaseParseNodeVisitor; -import org.apache.phoenix.parse.BooleanParseNodeVisitor; import org.apache.phoenix.parse.FunctionParseNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; -import org.apache.phoenix.parse.TraverseAllParseNodeVisitor; import org.apache.phoenix.parse.UDFParseNode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -265,6 +261,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private int[] dataPkPosition; private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; + private boolean rowKeyOrderOptimizable; private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; @@ -273,6 +270,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); + this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; @@ -434,7 +432,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { dataRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset); output.write(ptr.get(), ptr.getOffset(), ptr.getLength()); if (!dataRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) { - output.writeByte(QueryConstants.SEPARATOR_BYTE); + output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength()==0, dataRowKeySchema.getField(dataPosOffset))); } dataPosOffset++; } @@ -481,21 +479,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } boolean isDataColumnInverted = dataSortOrder != SortOrder.ASC; PDataType indexColumnType = IndexUtil.getIndexColumnDataType(isNullable, dataColumnType); - boolean isBytesComparable = dataColumnType.isBytesComparableWith(indexColumnType) ; - if (isBytesComparable && isDataColumnInverted == descIndexColumnBitSet.get(i)) { + boolean isBytesComparable = dataColumnType.isBytesComparableWith(indexColumnType); + boolean isIndexColumnDesc = descIndexColumnBitSet.get(i); + if (isBytesComparable && isDataColumnInverted == isIndexColumnDesc) { output.write(ptr.get(), ptr.getOffset(), ptr.getLength()); } else { if (!isBytesComparable) { indexColumnType.coerceBytes(ptr, dataColumnType, dataSortOrder, SortOrder.getDefault()); } - if (descIndexColumnBitSet.get(i) != isDataColumnInverted) { + if (isDataColumnInverted != isIndexColumnDesc) { writeInverted(ptr.get(), ptr.getOffset(), ptr.getLength(), output); } else { output.write(ptr.get(), ptr.getOffset(), ptr.getLength()); } } if (!indexColumnType.isFixedWidth()) { - output.writeByte(QueryConstants.SEPARATOR_BYTE); + output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, isIndexColumnDesc ? SortOrder.DESC : SortOrder.ASC)); } } int length = stream.size(); @@ -545,7 +544,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { indexRowKeySchema.next(ptr, indexPosOffset, maxRowKeyOffset); output.write(ptr.get(), ptr.getOffset(), ptr.getLength()); if (!dataRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) { - output.writeByte(QueryConstants.SEPARATOR_BYTE); + output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, dataRowKeySchema.getField(dataPosOffset))); } indexPosOffset++; dataPosOffset++; @@ -587,8 +586,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } } } - if (!dataRowKeySchema.getField(i).getDataType().isFixedWidth() && ((i+1) != dataRowKeySchema.getFieldCount())) { - output.writeByte(QueryConstants.SEPARATOR_BYTE); + // Write separator byte if variable length unless it's the last field in the schema + // (but we still need to write it if it's DESC to ensure sort order is correct). + byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, dataRowKeySchema.getField(i)); + if (!dataRowKeySchema.getField(i).getDataType().isFixedWidth() && (((i+1) != dataRowKeySchema.getFieldCount()) || sepByte == QueryConstants.DESC_SEPARATOR_BYTE)) { + output.writeByte(sepByte); } } int length = stream.size(); @@ -658,6 +660,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private RowKeySchema generateIndexRowKeySchema() { int nIndexedColumns = getIndexPkColumnCount() + (isMultiTenant ? 1 : 0) + (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) + (viewIndexId != null ? 1 : 0) - getNumViewConstants(); RowKeySchema.RowKeySchemaBuilder builder = new RowKeySchema.RowKeySchemaBuilder(nIndexedColumns); + builder.rowKeyOrderOptimizable(rowKeyOrderOptimizable); if (!isLocalIndex && nIndexSaltBuckets > 0) { builder.addField(SaltingUtil.SALTING_COLUMN, false, SortOrder.ASC); nIndexedColumns--; @@ -708,44 +711,67 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // same for all rows in this index) if (!viewConstantColumnBitSet.get(i)) { int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset); - indexFields[pos] = dataRowKeySchema.getField(i); + Field dataField = dataRowKeySchema.getField(i); + indexFields[pos] = + dataRowKeySchema.getField(i); } } + BitSet descIndexColumnBitSet = rowKeyMetaData.getDescIndexColumnBitSet(); Iterator<Expression> expressionItr = indexedExpressions.iterator(); - for (Field indexField : indexFields) { - if (indexField == null) { // Add field for kv column in index - final PDataType dataType = expressionItr.next().getDataType(); - builder.addField(new PDatum() { + for (int i = 0; i < indexFields.length; i++) { + Field indexField = indexFields[i]; + PDataType dataTypeToBe; + SortOrder sortOrderToBe; + boolean isNullableToBe; + Integer maxLengthToBe; + Integer scaleToBe; + if (indexField == null) { + Expression e = expressionItr.next(); + isNullableToBe = true; + dataTypeToBe = IndexUtil.getIndexColumnDataType(isNullableToBe, e.getDataType()); + sortOrderToBe = descIndexColumnBitSet.get(i) ? SortOrder.DESC : SortOrder.ASC; + maxLengthToBe = e.getMaxLength(); + scaleToBe = e.getScale(); + } else { + isNullableToBe = indexField.isNullable(); + dataTypeToBe = IndexUtil.getIndexColumnDataType(isNullableToBe, indexField.getDataType()); + sortOrderToBe = descIndexColumnBitSet.get(i) ? SortOrder.DESC : SortOrder.ASC; + maxLengthToBe = indexField.getMaxLength(); + scaleToBe = indexField.getScale(); + } + final PDataType dataType = dataTypeToBe; + final SortOrder sortOrder = sortOrderToBe; + final boolean isNullable = isNullableToBe; + final Integer maxLength = maxLengthToBe; + final Integer scale = scaleToBe; + builder.addField(new PDatum() { - @Override - public boolean isNullable() { - return true; - } + @Override + public boolean isNullable() { + return isNullable; + } - @Override - public PDataType getDataType() { - return IndexUtil.getIndexColumnDataType(true, dataType); - } + @Override + public PDataType getDataType() { + return dataType; + } - @Override - public Integer getMaxLength() { - return null; - } + @Override + public Integer getMaxLength() { + return maxLength; + } - @Override - public Integer getScale() { - return null; - } + @Override + public Integer getScale() { + return scale; + } - @Override - public SortOrder getSortOrder() { - return SortOrder.getDefault(); - } - - }, true, SortOrder.getDefault()); - } else { // add field from data row key - builder.addField(indexField); - } + @Override + public SortOrder getSortOrder() { + return sortOrder; + } + + }, true, sortOrder); } return builder.build(); } @@ -928,9 +954,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { byte[] cq = Bytes.readByteArray(input); coveredColumns.add(new ColumnReference(cf,cq)); } - indexTableName = Bytes.readByteArray(input); - dataEmptyKeyValueCF = Bytes.readByteArray(input); + // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); + if (len < 0) { + rowKeyOrderOptimizable = false; + len *= -1; + } else { + rowKeyOrderOptimizable = true; + } + indexTableName = new byte[len]; + input.readFully(indexTableName, 0, len); + dataEmptyKeyValueCF = Bytes.readByteArray(input); + len = WritableUtils.readVInt(input); //TODO remove this in the next major release boolean isNewClient = false; if (len < 0) { @@ -1023,7 +1058,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Bytes.writeByteArray(output, ref.getFamily()); Bytes.writeByteArray(output, ref.getQualifier()); } - Bytes.writeByteArray(output, indexTableName); + // TODO: remove when rowKeyOrderOptimizable hack no longer needed + WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1)); + output.write(indexTableName, 0, indexTableName.length); Bytes.writeByteArray(output, dataEmptyKeyValueCF); // TODO in order to maintain b/w compatibility encode emptyKeyValueCFPtr.getLength() as a negative value (so we can distinguish between new and old clients) // when indexedColumnTypes is removed, remove this
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index cf66d93..a0aefaa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -51,6 +51,8 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.filter.ColumnProjectionFilter; @@ -67,7 +69,6 @@ import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; @@ -134,6 +135,68 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return true; } + private static void initializeScan(QueryPlan plan, Integer perScanLimit) { + StatementContext context = plan.getContext(); + TableRef tableRef = plan.getTableRef(); + PTable table = tableRef.getTable(); + Scan scan = context.getScan(); + + Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); + // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys + if (context.getConnection().isDescVarLengthRowKeyUpgrade()) { + // We project *all* KeyValues across all column families as we make a pass over + // a physical table and we want to make sure we catch all KeyValues that may be + // dynamic or part of an updatable view. + familyMap.clear(); + scan.setMaxVersions(); + scan.setFilter(null); // Remove any filter + scan.setRaw(true); // Traverse (and subsequently clone) all KeyValues + // Pass over PTable so we can re-write rows according to the row key schema + scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, UngroupedAggregateRegionObserver.serialize(table)); + } else { + FilterableStatement statement = plan.getStatement(); + RowProjector projector = plan.getProjector(); + boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty(); + if (projector.isProjectEmptyKeyValue()) { + // If nothing projected into scan and we only have one column family, just allow everything + // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to + // be quite a bit faster. + // Where condition columns also will get added into familyMap + // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. + if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() + && table.getColumnFamilies().size() == 1) { + // Project the one column family. We must project a column family since it's possible + // that there are other non declared column families that we need to ignore. + scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); + } else { + byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); + // Project empty key value unless the column family containing it has + // been projected in its entirety. + if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { + scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + } + } + } else if (table.getViewType() == ViewType.MAPPED) { + // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the + // selected column values are returned back to client + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } + // Add FirstKeyOnlyFilter if there are no references to key value columns + if (keyOnlyFilter) { + ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); + } + + // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. + if (perScanLimit != null) { + ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); + } + + doColumnProjectionOptimization(context, scan, table, statement); + } + } + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit()); this.plan = plan; @@ -141,52 +204,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result StatementContext context = plan.getContext(); TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); - FilterableStatement statement = plan.getStatement(); - RowProjector projector = plan.getProjector(); physicalTableName = table.getPhysicalName().getBytes(); tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; - Scan scan = context.getScan(); // Used to tie all the scans together during logging scanId = UUID.randomUUID().toString(); - Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); - boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty(); - if (projector.isProjectEmptyKeyValue()) { - // If nothing projected into scan and we only have one column family, just allow everything - // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to - // be quite a bit faster. - // Where condition columns also will get added into familyMap - // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. - if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() - && table.getColumnFamilies().size() == 1) { - // Project the one column family. We must project a column family since it's possible - // that there are other non declared column families that we need to ignore. - scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); - } else { - byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); - // Project empty key value unless the column family containing it has - // been projected in its entirety. - if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); - } - } - } else if (table.getViewType() == ViewType.MAPPED) { - // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the - // selected column values are returned back to client - for (PColumnFamily family : table.getColumnFamilies()) { - scan.addFamily(family.getName().getBytes()); - } - } - // Add FirstKeyOnlyFilter if there are no references to key value columns - if (keyOnlyFilter) { - ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); - } - // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. - if (perScanLimit != null) { - ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); - } - - doColumnProjectionOptimization(context, scan, table, statement); + initializeScan(plan, perScanLimit); this.scans = getParallelScans(); List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); @@ -200,7 +223,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result this.allFutures = Lists.newArrayListWithExpectedSize(1); } - private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { + private static void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); if (familyMap != null && !familyMap.isEmpty()) { // columnsTracker contain cf -> qualifiers which should get returned. http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 7b7d4dc..d999ecb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -27,15 +27,17 @@ import java.util.List; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.expression.OrderByExpression; -import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.SizedUtil; /** * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause. @@ -155,7 +157,12 @@ public class OrderedResultIterator implements PeekingResultIterator { Ordering<ResultEntry> ordering = null; int pos = 0; for (OrderByExpression col : orderByExpressions) { - Ordering<ImmutableBytesWritable> o = Ordering.from(new ImmutableBytesWritable.Comparator()); + Expression e = col.getExpression(); + Comparator<ImmutableBytesWritable> comparator = + e.getSortOrder() == SortOrder.DESC && !e.getDataType().isFixedWidth() + ? buildDescVarLengthComparator() + : new ImmutableBytesWritable.Comparator(); + Ordering<ImmutableBytesWritable> o = Ordering.from(comparator); if(!col.isAscending()) o = o.reverse(); o = col.isNullsLast() ? o.nullsLast() : o.nullsFirst(); Ordering<ResultEntry> entryOrdering = o.onResultOf(new NthKey(pos++)); @@ -164,6 +171,23 @@ public class OrderedResultIterator implements PeekingResultIterator { return ordering; } + /* + * Same as regular comparator, but if all the bytes match and the length is + * different, returns the longer length as bigger. + */ + private static Comparator<ImmutableBytesWritable> buildDescVarLengthComparator() { + return new Comparator<ImmutableBytesWritable>() { + + @Override + public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2) { + return DescVarLengthFastByteComparisons.compareTo( + o1.get(), o1.getOffset(), o1.getLength(), + o2.get(), o2.getOffset(), o2.getLength()); + } + + }; + } + @Override public Tuple next() throws SQLException { return getResultIterator().next(); @@ -252,13 +276,13 @@ public class OrderedResultIterator implements PeekingResultIterator { planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderByExpressions.toString()); } - @Override - public String toString() { - return "OrderedResultIterator [thresholdBytes=" + thresholdBytes - + ", limit=" + limit + ", delegate=" + delegate - + ", orderByExpressions=" + orderByExpressions - + ", estimatedByteSize=" + estimatedByteSize - + ", resultIterator=" + resultIterator + ", byteSize=" - + byteSize + "]"; - } + @Override + public String toString() { + return "OrderedResultIterator [thresholdBytes=" + thresholdBytes + + ", limit=" + limit + ", delegate=" + delegate + + ", orderByExpressions=" + orderByExpressions + + ", estimatedByteSize=" + estimatedByteSize + + ", resultIterator=" + resultIterator + ", byteSize=" + + byteSize + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 5805999..0cd76fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -140,6 +140,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private Consistency consistency = Consistency.STRONG; private Map<String, String> customTracingAnnotations = emptyMap(); private final boolean isRequestLevelMetricsEnabled; + private final boolean isDescVarLengthRowKeyUpgrade; + + static { Tracing.addTraceMetricsSource(); } @@ -150,28 +153,35 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return props; } - public PhoenixConnection(PhoenixConnection connection) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache()); + public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException { + this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), isDescRowKeyOrderUpgrade); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; this.statementExecutionCounter = connection.statementExecutionCounter; } + + public PhoenixConnection(PhoenixConnection connection) throws SQLException { + this(connection, connection.isDescVarLengthRowKeyUpgrade); + } public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { this(connection.getQueryServices(), connection, scn); - this.sampler = connection.sampler; - this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache()); + this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.isDescVarLengthRowKeyUpgrade()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { + this(services, url, info, metaData, false); + } + + public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, boolean isDescVarLengthRowKeyUpgrade) throws SQLException { this.url = url; + this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; // Copy so client cannot change this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info); final PName tenantId = JDBCUtil.getTenantId(url, info); @@ -887,4 +897,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd mutationState.getReadMetricQueue().clearMetrics(); } } + + /** + * Returns true if this connection is being used to upgrade the + * data due to PHOENIX-2067 and false otherwise. + * @return + */ + public boolean isDescVarLengthRowKeyUpgrade() { + return isDescVarLengthRowKeyUpgrade; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 9cd32e8..feb5989 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -162,6 +162,7 @@ import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.cache.Cache; @@ -1831,7 +1832,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement * This closes the passed connection. */ private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException { - Properties props = new Properties(oldMetaConnection.getClientInfo()); + Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo()); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp)); // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache()); @@ -1970,6 +1971,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement */ logger.debug("No need to run 4.5 upgrade"); } + Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); + props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache()); + try { + Set<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescVarLengthRowKey(conn); + if (!tablesNeedingUpgrade.isEmpty()) { + logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns (PHOENIX-2067):\n" + Joiner.on(' ').join(tablesNeedingUpgrade)); + } + } catch (Exception ex) { + logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex); + } finally { + conn.close(); + } } } int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java index afcc741..bca55e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.query; -import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -394,18 +392,6 @@ public class KeyRange implements Writable { return lowerRange == KeyRange.EMPTY_RANGE.getLowerRange() && upperRange == KeyRange.EMPTY_RANGE.getUpperRange(); } - public KeyRange appendSeparator() { - byte[] lowerBound = getLowerRange(); - byte[] upperBound = getUpperRange(); - if (lowerBound != UNBOUND) { - lowerBound = ByteUtil.concat(lowerBound, SEPARATOR_BYTE_ARRAY); - } - if (upperBound != UNBOUND) { - upperBound = ByteUtil.concat(upperBound, SEPARATOR_BYTE_ARRAY); - } - return getKeyRange(lowerBound, lowerInclusive, upperBound, upperInclusive); - } - /** * @return list of at least size 1 */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index d095049..92479b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -114,6 +114,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.util.ByteUtil; @@ -163,6 +164,8 @@ public interface QueryConstants { */ public static final byte SEPARATOR_BYTE = (byte) 0; public static final byte[] SEPARATOR_BYTE_ARRAY = new byte[] {SEPARATOR_BYTE}; + public static final byte DESC_SEPARATOR_BYTE = SortOrder.invert(SEPARATOR_BYTE); + public static final byte[] DESC_SEPARATOR_BYTE_ARRAY = new byte[] {DESC_SEPARATOR_BYTE}; public static final String DEFAULT_COPROCESS_PATH = "phoenix.jar"; public final static int MILLIS_IN_DAY = 1000 * 60 * 60 * 24; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 2a43679..0251da1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -241,4 +241,9 @@ public class DelegateTable implements PTable { public int getBaseColumnCount() { return delegate.getBaseColumnCount(); } + + @Override + public boolean rowKeyOrderOptimizable() { + return delegate.rowKeyOrderOptimizable(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 9ad52a5..b1fcf30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -189,6 +189,7 @@ import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -702,8 +703,13 @@ public class MetaDataClient { sortOrder = pkSortOrder.getSecond(); } } - String columnName = columnDefName.getColumnName(); + if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED) + .setColumnName(columnName) + .build().buildException(); + } + PName familyName = null; if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS) @@ -1432,6 +1438,8 @@ public class MetaDataClient { String parentTableName = null; PName tenantId = connection.getTenantId(); String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString(); + Long scn = connection.getSCN(); + long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; boolean multiTenant = false; boolean storeNulls = false; Integer saltBucketNum = null; @@ -1439,6 +1447,7 @@ public class MetaDataClient { boolean isImmutableRows = false; List<PName> physicalNames = Collections.emptyList(); boolean addSaltColumn = false; + boolean rowKeyOrderOptimizable = true; if (parent != null && tableType == PTableType.INDEX) { // Index on view // TODO: Can we support a multi-tenant index directly on a multi-tenant @@ -1464,7 +1473,7 @@ public class MetaDataClient { parentTableName = parent.getTableName().getString(); // Pass through data table sequence number so we can check it hasn't changed PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); - incrementStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); + incrementStatement.setString(1, tenantIdStr); incrementStatement.setString(2, schemaName); incrementStatement.setString(3, parentTableName); incrementStatement.setLong(4, parent.getSequenceNumber()); @@ -1476,7 +1485,7 @@ public class MetaDataClient { // Add row linking from data table row to index table row PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK); - linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); + linkStatement.setString(1, tenantIdStr); linkStatement.setString(2, schemaName); linkStatement.setString(3, parentTableName); linkStatement.setString(4, tableName); @@ -1589,6 +1598,12 @@ public class MetaDataClient { } else { // Propagate property values to VIEW. // TODO: formalize the known set of these properties + // Manually transfer the ROW_KEY_ORDER_OPTIMIZABLE_BYTES from parent as we don't + // want to add this hacky flag to the schema (see PHOENIX-2067). + rowKeyOrderOptimizable = parent.rowKeyOrderOptimizable(); + if (rowKeyOrderOptimizable) { + UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetaData, SchemaUtil.getTableKey(tenantIdStr, schemaName, tableName), clientTimeStamp); + } multiTenant = parent.isMultiTenant(); saltBucketNum = parent.getBucketNum(); isImmutableRows = parent.isImmutableRows(); @@ -1606,7 +1621,7 @@ public class MetaDataClient { // FIXME: not currently used, but see PHOENIX-1367 // as fixing that will require it's usage. PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK); - linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); + linkStatement.setString(1, tenantIdStr); linkStatement.setString(2, schemaName); linkStatement.setString(3, tableName); linkStatement.setString(4, parent.getName().getString()); @@ -1629,7 +1644,7 @@ public class MetaDataClient { // Add row linking from data table row to physical table row PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK); for (PName physicalName : physicalNames) { - linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); + linkStatement.setString(1, tenantIdStr); linkStatement.setString(2, schemaName); linkStatement.setString(3, tableName); linkStatement.setString(4, physicalName.getString()); @@ -1788,7 +1803,7 @@ public class MetaDataClient { Collections.<PTable>emptyList(), isImmutableRows, Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, - Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType); + Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true); connection.addTable(table); } else if (tableType == PTableType.INDEX && indexId == null) { if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) { @@ -1942,7 +1957,7 @@ public class MetaDataClient { PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, - indexId, indexType); + indexId, indexType, rowKeyOrderOptimizable); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); return table; @@ -2889,6 +2904,8 @@ public class MetaDataClient { if (code == MutationCode.TABLE_ALREADY_EXISTS) { if (result.getTable() != null) { // To accommodate connection-less update of index state addTableToCache(result); + // Set so that we get the table below with the potentially modified rowKeyOrderOptimizable flag set + indexRef.setTable(result.getTable()); } } if (newIndexState == PIndexState.BUILDING) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index b983074..8da2206 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -323,4 +323,13 @@ public interface PTable extends PMetaDataEntity { IndexType getIndexType(); PTableStats getTableStats(); int getBaseColumnCount(); + + /** + * Determines whether or not we may optimize out an ORDER BY or do a GROUP BY + * in-place when the optimizer tells us it's possible. This is due to PHOENIX-2067 + * and only applicable for tables using DESC primary key column(s) which have + * not been upgraded. + * @return true if optimizations row key order optimizations are possible + */ + boolean rowKeyOrderOptimizable(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index a947bfc..1756c2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -19,7 +19,6 @@ package org.apache.phoenix.schema; import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly; import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly; -import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE; import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN; import java.io.IOException; @@ -126,6 +125,8 @@ public class PTableImpl implements PTable { private IndexType indexType; private PTableStats tableStats = PTableStats.EMPTY_STATS; private int baseColumnCount; + private boolean hasDescVarLengthColumns; + private boolean rowKeyOrderOptimizable; public PTableImpl() { this.indexes = Collections.emptyList(); @@ -195,7 +196,8 @@ public class PTableImpl implements PTable { table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(), indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException { @@ -203,7 +205,8 @@ public class PTableImpl implements PTable { table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException { @@ -211,7 +214,8 @@ public class PTableImpl implements PTable { table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), - table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException { @@ -219,7 +223,8 @@ public class PTableImpl implements PTable { table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), + table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException { @@ -227,7 +232,8 @@ public class PTableImpl implements PTable { table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException { @@ -236,7 +242,18 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount()); + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); + } + + public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException { + return new PTableImpl( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), + table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), + table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), + table.getBaseColumnCount(), rowKeyOrderOptimizable); } public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException { @@ -245,28 +262,29 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats, table.getBaseColumnCount()); + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats, + table.getBaseColumnCount(), table.rowKeyOrderOptimizable()); } public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, - boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException { + boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, - indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT); + indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable); } public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, - boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount) + boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, - defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount); + defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable); } private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, @@ -274,10 +292,10 @@ public class PTableImpl implements PTable { PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, - PTableStats stats, int baseColumnCount) throws SQLException { + PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException { init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, - viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount); + viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable); } @Override @@ -305,7 +323,7 @@ public class PTableImpl implements PTable { PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, - IndexType indexType , int baseColumnCount) throws SQLException { + IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException { Preconditions.checkNotNull(schemaName); Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE + @@ -335,6 +353,7 @@ public class PTableImpl implements PTable { this.viewIndexId = viewIndexId; this.indexType = indexType; this.tableStats = stats; + this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; List<PColumn> pkColumns; PColumn[] allColumns; @@ -386,7 +405,8 @@ public class PTableImpl implements PTable { for (PColumn column : allColumns) { PName familyName = column.getFamilyName(); if (familyName == null) { - pkColumns.add(column); + hasDescVarLengthColumns |= (column.getSortOrder() == SortOrder.DESC && !column.getDataType().isFixedWidth()); + pkColumns.add(column); } if (familyName == null) { estimatedSize += column.getEstimatedSize(); // PK columns @@ -401,6 +421,7 @@ public class PTableImpl implements PTable { } } this.pkColumns = ImmutableList.copyOf(pkColumns); + builder.rowKeyOrderOptimizable(this.rowKeyOrderOptimizable()); // after hasDescVarLengthColumns is calculated this.rowKeySchema = builder.build(); estimatedSize += rowKeySchema.getEstimatedSize(); Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator(); @@ -500,18 +521,22 @@ public class PTableImpl implements PTable { List<PColumn> columns = getPKColumns(); int nColumns = columns.size(); PDataType type = null; + SortOrder sortOrder = null; + boolean wasNull = false; while (i < nValues && i < nColumns) { // Separate variable length column values in key with zero byte if (type != null && !type.isFixedWidth()) { - os.write(SEPARATOR_BYTE); + os.write(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable(), wasNull, sortOrder)); } PColumn column = columns.get(i); + sortOrder = column.getSortOrder(); type = column.getDataType(); // This will throw if the value is null and the type doesn't allow null byte[] byteValue = values[i++]; if (byteValue == null) { byteValue = ByteUtil.EMPTY_BYTE_ARRAY; } + wasNull = byteValue.length == 0; // An empty byte array return value means null. Do this, // since a type may have muliple representations of null. // For example, VARCHAR treats both null and an empty string @@ -529,11 +554,14 @@ public class PTableImpl implements PTable { } os.write(byteValue, 0, byteValue.length); } + // Need trailing byte for DESC columns + if (type != null && !type.isFixedWidth() && SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable(), wasNull, sortOrder) == QueryConstants.DESC_SEPARATOR_BYTE) { + os.write(QueryConstants.DESC_SEPARATOR_BYTE); + } // If some non null pk values aren't set, then throw if (i < nColumns) { PColumn column = columns.get(i); - type = column.getDataType(); - if (type.isFixedWidth() || !column.isNullable()) { + if (column.getDataType().isFixedWidth() || !column.isNullable()) { throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null"); } } @@ -977,12 +1005,16 @@ public class PTableImpl implements PTable { baseColumnCount = table.getBaseColumnCount(); } + boolean rowKeyOrderOptimizable = false; + if (table.hasRowKeyOrderOptimizable()) { + rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable(); + } try { PTableImpl result = new PTableImpl(); result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName, (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL, - multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount); + multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable); return result; } catch (SQLException e) { throw new RuntimeException(e); // Impossible @@ -1072,6 +1104,7 @@ public class PTableImpl implements PTable { } } builder.setBaseColumnCount(table.getBaseColumnCount()); + builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable()); return builder.build(); } @@ -1095,4 +1128,9 @@ public class PTableImpl implements PTable { public int getBaseColumnCount() { return baseColumnCount; } + + @Override + public boolean rowKeyOrderOptimizable() { + return rowKeyOrderOptimizable || !hasDescVarLengthColumns; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java index e5aa571..9d86dd6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java @@ -17,13 +17,12 @@ */ package org.apache.phoenix.schema; -import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE; - import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.SchemaUtil; /** @@ -37,17 +36,19 @@ import org.apache.phoenix.query.QueryConstants; * @since 0.1 */ public class RowKeySchema extends ValueSchema { - public static final RowKeySchema EMPTY_SCHEMA = new RowKeySchema(0,Collections.<Field>emptyList()) + public static final RowKeySchema EMPTY_SCHEMA = new RowKeySchema(0,Collections.<Field>emptyList(), true) ; public RowKeySchema() { } - protected RowKeySchema(int minNullable, List<Field> fields) { - super(minNullable, fields); + protected RowKeySchema(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) { + super(minNullable, fields, rowKeyOrderOptimizable); } public static class RowKeySchemaBuilder extends ValueSchemaBuilder { + private boolean rowKeyOrderOptimizable = false; + public RowKeySchemaBuilder(int maxFields) { super(maxFields); setMaxFields(maxFields); @@ -59,13 +60,22 @@ public class RowKeySchema extends ValueSchema { return this; } + public RowKeySchemaBuilder rowKeyOrderOptimizable(boolean rowKeyOrderOptimizable) { + this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; + return this; + } + @Override public RowKeySchema build() { List<Field> condensedFields = buildFields(); - return new RowKeySchema(this.minNullable, condensedFields); + return new RowKeySchema(this.minNullable, condensedFields, rowKeyOrderOptimizable); } } + public boolean rowKeyOrderOptimizable() { + return rowKeyOrderOptimizable; + } + public int getMaxFields() { return this.getMinNullable(); } @@ -148,13 +158,19 @@ public class RowKeySchema extends ValueSchema { if (field.getDataType().isFixedWidth()) { ptr.set(ptr.get(),ptr.getOffset(), field.getByteSize()); } else { - if (position+1 == getFieldCount() ) { // Last field has no terminator - ptr.set(ptr.get(), ptr.getOffset(), maxOffset - ptr.getOffset()); + if (position+1 == getFieldCount() ) { + // Last field has no terminator unless it's descending sort order + int len = maxOffset - ptr.getOffset(); + ptr.set(ptr.get(), ptr.getOffset(), maxOffset - ptr.getOffset() - (SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, len == 0, field) == QueryConstants.DESC_SEPARATOR_BYTE ? 1 : 0)); } else { byte[] buf = ptr.get(); int offset = ptr.getOffset(); - while (offset < maxOffset && buf[offset] != SEPARATOR_BYTE) { - offset++; + // First byte + if (offset < maxOffset && buf[offset] != QueryConstants.SEPARATOR_BYTE) { + byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, false, field); + do { + offset++; + } while (offset < maxOffset && buf[offset] != sepByte); } ptr.set(buf, ptr.getOffset(), offset - ptr.getOffset()); } @@ -204,8 +220,12 @@ public class RowKeySchema extends ValueSchema { if (!field.getDataType().isFixedWidth()) { byte[] buf = ptr.get(); int offset = ptr.getOffset()-1-offsetAdjustment; - while (offset > minOffset /* sanity check*/ && buf[offset] != QueryConstants.SEPARATOR_BYTE) { - offset--; + // Separator always zero byte if zero length + if (offset > minOffset && buf[offset] != QueryConstants.SEPARATOR_BYTE) { + byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, false, field); + do { + offset--; + } while (offset > minOffset && buf[offset] != sepByte); } if (offset == minOffset) { // shouldn't happen ptr.set(buf, minOffset, ptr.getOffset()-minOffset-1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java index 03b2e6a..2a59e01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.schema; -import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -28,6 +26,7 @@ import java.util.List; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.ByteUtil; @@ -150,6 +149,9 @@ public class RowKeyValueAccessor implements Writable { ByteUtil.serializeVIntArray(output, offsets, length); } + private static boolean isSeparatorByte(byte b) { + return b == QueryConstants.SEPARATOR_BYTE || b == QueryConstants.DESC_SEPARATOR_BYTE; + } /** * Calculate the byte offset in the row key to the start of the PK column value * @param keyBuffer the byte array of the row key @@ -164,7 +166,7 @@ public class RowKeyValueAccessor implements Writable { } else { // Else, a negative offset is the number of variable length values to skip while (offset++ < 0) { // FIXME: keyOffset < keyBuffer.length required because HBase passes bogus keys to filter to position scan (HBASE-6562) - while (keyOffset < keyBuffer.length && keyBuffer[keyOffset++] != SEPARATOR_BYTE) { + while (keyOffset < keyBuffer.length && !isSeparatorByte(keyBuffer[keyOffset++])) { } } } @@ -181,11 +183,11 @@ public class RowKeyValueAccessor implements Writable { */ public int getLength(byte[] keyBuffer, int keyOffset, int maxOffset) { if (!hasSeparator) { - return maxOffset - keyOffset; + return maxOffset - keyOffset - (keyBuffer[maxOffset-1] == QueryConstants.DESC_SEPARATOR_BYTE ? 1 : 0); } int offset = keyOffset; // FIXME: offset < maxOffset required because HBase passes bogus keys to filter to position scan (HBASE-6562) - while (offset < maxOffset && keyBuffer[offset] != SEPARATOR_BYTE) { + while (offset < maxOffset && !isSeparatorByte(keyBuffer[offset])) { offset++; } return offset - keyOffset; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java index 7660ffe..a4b40f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java @@ -49,12 +49,20 @@ public abstract class ValueSchema implements Writable { private boolean isFixedLength; private boolean isMaxLength; private int minNullable; + // Only applicable for RowKeySchema (and only due to PHOENIX-2067), but + // added here as this is where serialization is done (and we need to + // maintain the same serialization shape for b/w compat). + protected boolean rowKeyOrderOptimizable; public ValueSchema() { } protected ValueSchema(int minNullable, List<Field> fields) { - init(minNullable, fields); + this(minNullable, fields, true); + } + + protected ValueSchema(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) { + init(minNullable, fields, rowKeyOrderOptimizable); } @Override @@ -68,7 +76,8 @@ public abstract class ValueSchema implements Writable { SizedUtil.ARRAY_SIZE + count * Field.ESTIMATED_SIZE + SizedUtil.sizeOfArrayList(count); } - private void init(int minNullable, List<Field> fields) { + private void init(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) { + this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; this.minNullable = minNullable; this.fields = ImmutableList.copyOf(fields); int estimatedLength = 0; @@ -324,14 +333,6 @@ public abstract class ValueSchema implements Writable { return size; } - public void serialize(DataOutput output) throws IOException { - WritableUtils.writeVInt(output, minNullable); - WritableUtils.writeVInt(output, fields.size()); - for (int i = 0; i < fields.size(); i++) { - fields.get(i).write(output); - } - } - public Field getField(int position) { return fields.get(fieldIndexByPosition[position]); } @@ -366,19 +367,24 @@ public abstract class ValueSchema implements Writable { public void readFields(DataInput in) throws IOException { int minNullable = WritableUtils.readVInt(in); int nFields = WritableUtils.readVInt(in); + boolean rowKeyOrderOptimizable = false; + if (nFields < 0) { + rowKeyOrderOptimizable = true; + nFields *= -1; + } List<Field> fields = Lists.newArrayListWithExpectedSize(nFields); for (int i = 0; i < nFields; i++) { Field field = new Field(); field.readFields(in); fields.add(field); } - init(minNullable, fields); + init(minNullable, fields, rowKeyOrderOptimizable); } @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, minNullable); - WritableUtils.writeVInt(out, fields.size()); + WritableUtils.writeVInt(out, fields.size() * (rowKeyOrderOptimizable ? -1 : 1)); for (int i = 0; i < fields.size(); i++) { fields.get(i).write(out); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index ebb7d1f..e692470 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -60,10 +60,10 @@ public class StatisticsUtil { int offset = 0; System.arraycopy(table, 0, rowKey, offset, table.length); offset += table.length; - rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; + rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength()); offset += fam.getLength(); - rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; + rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC System.arraycopy(region, 0, rowKey, offset, region.length); return rowKey; }
