Repository: phoenix Updated Branches: refs/heads/encodecolumns2 1d2e2f51f -> 452155ce9
PHOENIX-3667 Optimize BooleanExpressionFilter for tables with encoded columns Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/452155ce Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/452155ce Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/452155ce Branch: refs/heads/encodecolumns2 Commit: 452155ce90c6c050cf26f64e69105e40d8f5a837 Parents: 1d2e2f5 Author: Samarth <[email protected]> Authored: Fri Feb 24 16:52:42 2017 -0800 Committer: Samarth <[email protected]> Committed: Fri Feb 24 16:52:42 2017 -0800 ---------------------------------------------------------------------- .../apache/phoenix/compile/WhereCompiler.java | 10 +- ...EncodedQualifiersColumnProjectionFilter.java | 151 ++++++++ .../MultiEncodedCQKeyValueComparisonFilter.java | 369 +++++++++++++++++++ .../filter/MultiKeyValueComparisonFilter.java | 3 +- .../phoenix/iterate/BaseResultIterators.java | 16 +- .../apache/phoenix/util/EncodedColumnsUtil.java | 7 + .../java/org/apache/phoenix/util/ScanUtil.java | 19 +- .../phoenix/compile/QueryCompilerTest.java | 3 +- .../compile/SelectStatementRewriterTest.java | 11 +- .../phoenix/compile/WhereCompilerTest.java | 19 +- .../java/org/apache/phoenix/util/TestUtil.java | 6 + 11 files changed, 594 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 3026514..fae7a5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter; + import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Collections; @@ -36,6 +38,7 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter; +import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.RowKeyComparisonFilter; import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter; @@ -53,6 +56,7 @@ import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; @@ -264,6 +268,8 @@ public class WhereCompiler { return null; } }); + QualifierEncodingScheme encodingScheme = context.getCurrentTable().getTable().getEncodingScheme(); + ImmutableStorageScheme storageScheme = context.getCurrentTable().getTable().getImmutableStorageScheme(); switch (counter.getCount()) { case NONE: PTable table = context.getResolver().getTables().get(0).getTable(); @@ -276,7 +282,9 @@ public class WhereCompiler { filter = disambiguateWithFamily ? new SingleCFCQKeyValueComparisonFilter(whereClause) : new SingleCQKeyValueComparisonFilter(whereClause); break; case MULTIPLE: - filter = disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter(whereClause) : new MultiCQKeyValueComparisonFilter(whereClause); + filter = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) ? new MultiEncodedCQKeyValueComparisonFilter( + whereClause, encodingScheme) : (disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter( + whereClause) : new MultiCQKeyValueComparisonFilter(whereClause)); break; } scan.setFilter(filter); http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java new file mode 100644 index 0000000..cfacb4f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_BYTES; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.BitSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +public class EncodedQualifiersColumnProjectionFilter extends FilterBase implements Writable { + + private byte[] emptyCFName; + private BitSet trackedColumns; + private QualifierEncodingScheme encodingScheme; + private Set<byte[]> conditionOnlyCfs; + + public EncodedQualifiersColumnProjectionFilter() {} + + public EncodedQualifiersColumnProjectionFilter(byte[] emptyCFName, BitSet trackedColumns, Set<byte[]> conditionCfs, QualifierEncodingScheme encodingScheme) { + checkArgument(encodingScheme != NON_ENCODED_QUALIFIERS, "Filter can only be used for encoded qualifiers"); + this.emptyCFName = emptyCFName; + this.trackedColumns = trackedColumns; + this.encodingScheme = encodingScheme; + this.conditionOnlyCfs = conditionCfs; + } + + @Override + public void readFields(DataInput input) throws IOException { + this.emptyCFName = WritableUtils.readCompressedByteArray(input); + int bitsetLongArraySize = WritableUtils.readVInt(input); + long[] bitsetLongArray = new long[bitsetLongArraySize]; + for (int i = 0; i < bitsetLongArraySize; i++) { + bitsetLongArray[i] = WritableUtils.readVLong(input); + } + this.trackedColumns = BitSet.valueOf(bitsetLongArray); + this.encodingScheme = QualifierEncodingScheme.values()[WritableUtils.readVInt(input)]; + int conditionOnlyCfsSize = WritableUtils.readVInt(input); + this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + while (conditionOnlyCfsSize > 0) { + this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input)); + conditionOnlyCfsSize--; + } + } + + @Override + public void write(DataOutput output) throws IOException { + WritableUtils.writeCompressedByteArray(output, this.emptyCFName); + long[] longArrayOfBitSet = trackedColumns.toLongArray(); + WritableUtils.writeVInt(output, longArrayOfBitSet.length); + for (Long l : longArrayOfBitSet) { + WritableUtils.writeVLong(output, l); + } + WritableUtils.writeVInt(output, encodingScheme.ordinal()); + WritableUtils.writeVInt(output, this.conditionOnlyCfs.size()); + for (byte[] f : this.conditionOnlyCfs) { + WritableUtils.writeCompressedByteArray(output, f); + } + } + + @Override + public byte[] toByteArray() throws IOException { + return Writables.getBytes(this); + } + + public static EncodedQualifiersColumnProjectionFilter parseFrom(final byte [] pbBytes) throws DeserializationException { + try { + return (EncodedQualifiersColumnProjectionFilter)Writables.getWritable(pbBytes, new EncodedQualifiersColumnProjectionFilter()); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + @Override + public void filterRowCells(List<Cell> kvs) throws IOException { + if (kvs.isEmpty()) return; + Cell firstKV = kvs.get(0); + Iterables.removeIf(kvs, new Predicate<Cell>() { + @Override + public boolean apply(Cell kv) { + int qualifier = encodingScheme.decode(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); + return !trackedColumns.get(qualifier); + } + }); + if (kvs.isEmpty()) { + kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(), + this.emptyCFName, 0, this.emptyCFName.length, ENCODED_EMPTY_COLUMN_BYTES, 0, + ENCODED_EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); + } + } + + @Override + public boolean hasFilterRow() { + return true; + } + + @Override + public boolean isFamilyEssential(byte[] name) { + return conditionOnlyCfs.isEmpty() || this.conditionOnlyCfs.contains(name); + } + + @Override + public String toString() { + return ""; + } + + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + + interface ColumnTracker { + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java new file mode 100644 index 0000000..00e662f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.BitSet; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.tuple.BaseTuple; + +/** + * Filter used for tables that use number based column qualifiers generated by one of the encoding schemes in + * {@link QualifierEncodingScheme}. Because the qualifiers are number based, instead of using a map of cells to track + * the columns that have been found, we can use an array of cells where the index into the array would be derived by the + * number based column qualifier. See {@link EncodedCQIncrementalResultTuple}. Using this filter helps us to directly + * seek to the next row when the column qualifier that we have encountered is greater than the maxQualifier that we + * expect. This helps in speeding up the queries filtering on key value columns. + */ +public class MultiEncodedCQKeyValueComparisonFilter extends BooleanExpressionFilter { + // Smallest qualifier for the columns that are being projected and filtered on + private int minQualifier; + + // Largest qualifier for the columns that are being projected and filtered on + private int maxQualifier; + + private QualifierEncodingScheme encodingScheme; + + // Smallest qualifier for the columns in where expression + private int whereExpressionMinQualifier; + + // Largest qualifier for the columns in where expression + private int whereExpressionMaxQualifier; + + private FilteredKeyValueHolder filteredKeyValues; + + // BitSet to track the qualifiers in where expression that we expect to find while filtering a row + private BitSet whereExpressionQualifiers; + + // Set to track the column families of the columns in where expression + private TreeSet<byte[]> cfSet; + + // Boolean that tells us whether the result of expression evaluation as and when we filter key values in a row + private Boolean matchedColumn; + + // Tuple used to store the relevant key values found while filtering a row + private EncodedCQIncrementalResultTuple inputTuple = new EncodedCQIncrementalResultTuple(); + + // Member variable to cache the size of whereExpressionQualifiers + private int expectedCardinality; + + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + + public MultiEncodedCQKeyValueComparisonFilter() {} + + public MultiEncodedCQKeyValueComparisonFilter(Expression expression, QualifierEncodingScheme scheme) { + super(expression); + checkArgument(scheme != NON_ENCODED_QUALIFIERS, "Filter can only be used for encoded qualifiers"); + this.encodingScheme = scheme; + initFilter(expression); + } + + private final class FilteredKeyValueHolder { + // Cell values corresponding to columns in where expression that were found while filtering a row. + private Cell[] filteredCells; + + // BitSet to track whether qualifiers in where expression were found when filtering a row + private BitSet filteredQualifiers; + + // Using an explicit counter instead of relying on the cardinality of the bitset as computing the + // cardinality could be slightly more expensive than just incrementing an integer + private int numKeyValues; + + private FilteredKeyValueHolder(int size) { + filteredCells = new Cell[size]; + filteredQualifiers = new BitSet(size); + } + + private void setCell(int qualifier, Cell c) { + int index = qualifier - whereExpressionMinQualifier; + filteredCells[index] = c; + filteredQualifiers.set(index); + numKeyValues++; + } + + private Cell getCell(int qualifier) { + int index = qualifier - whereExpressionMinQualifier; + return filteredQualifiers.get(index) ? filteredCells[index] : null; + } + + private void clear() { + // Note here that we are only clearing out the filteredQualifiers bitset. We are not setting all the + // entries in filteredKeyValues to null or allocating a new Cell array as that would be expensive. + filteredQualifiers.clear(); + numKeyValues = 0; + } + + /** + * This method really shouldn't be the way for getting hold of cells. It was + * just added to keep the tuple.get(index) method happy. + */ + public Cell getCellAtIndex(int index) { + int bitIndex; + for (bitIndex = filteredQualifiers.nextSetBit(0); bitIndex >= 0 && index >= 0; bitIndex = filteredQualifiers + .nextSetBit(bitIndex + 1)) { + index--; + } + if (bitIndex < 0) { throw new NoSuchElementException(); } + return filteredCells[bitIndex]; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(100); + int length = filteredQualifiers.length(); + for (int i = 0; i < length; i++) { + sb.append(filteredCells[i].toString()); + } + return sb.toString(); + } + + private boolean allColumnsFound() { + return numKeyValues == expectedCardinality; + } + + private int numKeyValues() { + return numKeyValues; + } + + } + + private void initFilter(Expression expression) { + cfSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + final BitSet expressionQualifiers = new BitSet(20); + final Pair<Integer, Integer> range = new Pair<>(); + ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { + @Override + public Void visit(KeyValueColumnExpression expression) { + int qualifier = encodingScheme.decode(expression.getColumnQualifier()); + if (range.getFirst() == null) { + range.setFirst(qualifier); + range.setSecond(qualifier); + } else if (qualifier < range.getFirst()) { + range.setFirst(qualifier); + } else if (qualifier > range.getSecond()) { + range.setSecond(qualifier); + } + cfSet.add(expression.getColumnFamily()); + expressionQualifiers.set(qualifier); + return null; + } + }; + expression.accept(visitor); + // Set min and max qualifiers for columns in the where expression + whereExpressionMinQualifier = range.getFirst(); + whereExpressionMaxQualifier = range.getSecond(); + + int size = whereExpressionMaxQualifier - whereExpressionMinQualifier + 1; + filteredKeyValues = new FilteredKeyValueHolder(size); + + // Initialize the bitset and mark the qualifiers for columns in where expression + whereExpressionQualifiers = new BitSet(size); + for (int i = whereExpressionMinQualifier; i <= whereExpressionMaxQualifier; i++) { + if (expressionQualifiers.get(i)) { + whereExpressionQualifiers.set(i - whereExpressionMinQualifier); + } + } + expectedCardinality = whereExpressionQualifiers.cardinality(); + } + + private boolean isQualifierForColumnInWhereExpression(int qualifier) { + return qualifier >= whereExpressionMinQualifier ? whereExpressionQualifiers.get(qualifier - whereExpressionMinQualifier) : false; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) { + if (Boolean.TRUE.equals(this.matchedColumn)) { + // We already found and matched the single column, all keys now pass + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + if (Boolean.FALSE.equals(this.matchedColumn)) { + // We found all the columns, but did not match the expression, so skip to next row + return ReturnCode.NEXT_ROW; + } + inputTuple.setKey(cell); + int qualifier = encodingScheme.decode(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (isQualifierForColumnInWhereExpression(qualifier)) { + filteredKeyValues.setCell(qualifier, cell); + // We found a new column, so we can re-evaluate + this.matchedColumn = this.evaluate(inputTuple); + if (this.matchedColumn == null) { + if (inputTuple.isImmutable()) { + this.matchedColumn = Boolean.FALSE; + } else { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + } + return this.matchedColumn ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW; + } + // The qualifier is not one of the qualifiers in the expression. So decide whether + // we would need to include it in our result. + if (qualifier < minQualifier) { + // Qualifier is smaller than the minimum expected qualifier. Look at the next column. + return ReturnCode.NEXT_COL; + } + // TODO: I don't think we would ever hit this case of encountering a greater than what we expect. + // Leaving the code commented out here for future reference. + // if (qualifier > maxQualifier) { + // Qualifier is larger than the max expected qualifier. We are done looking at columns in this row. + // return ReturnCode.NEXT_ROW; + // } + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + + @Override + public boolean filterRow() { + if (this.matchedColumn == null && !inputTuple.isImmutable() && expression.requiresFinalEvaluation()) { + inputTuple.setImmutable(); + this.matchedColumn = this.evaluate(inputTuple); + } + return ! (Boolean.TRUE.equals(this.matchedColumn)); + } + + final class EncodedCQIncrementalResultTuple extends BaseTuple { + private final ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + private boolean isImmutable; + + @Override + public boolean isImmutable() { + return isImmutable || filteredKeyValues.allColumnsFound(); + } + + public void setImmutable() { + this.isImmutable = true; + } + + private void setKey(Cell value) { + keyPtr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength()); + } + + @Override + public void getKey(ImmutableBytesWritable ptr) { + ptr.set(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()); + } + + @Override + public Cell getValue(byte[] cf, byte[] cq) { + int qualifier = encodingScheme.decode(cq); + return filteredKeyValues.getCell(qualifier); + } + + @Override + public String toString() { + return filteredKeyValues.toString(); + } + + @Override + public int size() { + return filteredKeyValues.numKeyValues(); + } + + /** + * This method doesn't perform well and shouldn't be the way of + * getting hold of elements in the tuple. + */ + @Override + public Cell getValue(int index) { + return filteredKeyValues.getCellAtIndex(index); + } + + @Override + public boolean getValue(byte[] family, byte[] qualifier, + ImmutableBytesWritable ptr) { + Cell cell = getValue(family, qualifier); + if (cell == null) + return false; + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + return true; + } + + void reset() { + isImmutable = false; + keyPtr.set(UNITIALIZED_KEY_BUFFER); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + this.minQualifier = WritableUtils.readVInt(input); + this.maxQualifier = WritableUtils.readVInt(input); + this.whereExpressionMinQualifier = WritableUtils.readVInt(input); + this.whereExpressionMaxQualifier = WritableUtils.readVInt(input); + this.encodingScheme = QualifierEncodingScheme.values()[WritableUtils.readVInt(input)]; + super.readFields(input); + initFilter(expression); + } + + @Override + public void write(DataOutput output) throws IOException { + WritableUtils.writeVInt(output, minQualifier); + WritableUtils.writeVInt(output, maxQualifier); + WritableUtils.writeVInt(output, whereExpressionMinQualifier); + WritableUtils.writeVInt(output, whereExpressionMaxQualifier); + WritableUtils.writeVInt(output, encodingScheme.ordinal()); + super.write(output); + } + + public void setMinMaxQualifierRange(Pair<Integer, Integer> minMaxQualifiers) { + this.minQualifier = minMaxQualifiers.getFirst(); + this.maxQualifier = minMaxQualifiers.getSecond(); + } + + public static MultiEncodedCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { + try { + return (MultiEncodedCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new MultiEncodedCQKeyValueComparisonFilter()); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + @Override + public void reset() { + filteredKeyValues.clear(); + matchedColumn = null; + inputTuple.reset(); + super.reset(); + } + + @Override + public boolean isFamilyEssential(byte[] name) { + // Only the column families involved in the expression are essential. + // The others are for columns projected in the select expression. + return cfSet.contains(name); + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index 88f707d..00ecd9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -26,7 +26,6 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.ExpressionVisitor; @@ -232,7 +231,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil return ! (Boolean.TRUE.equals(this.matchedColumn)); } - @Override + @Override public void reset() { matchedColumn = null; inputTuple.reset(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 0e87544..33e2f9e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -26,6 +26,8 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIM import static org.apache.phoenix.schema.PTable.IndexType.LOCAL; import static org.apache.phoenix.schema.PTableType.INDEX; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter; +import static org.apache.phoenix.util.ScanUtil.hasDynamicColumns; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -33,6 +35,7 @@ import java.io.DataInputStream; import java.io.EOFException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -74,6 +77,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; +import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.parse.FilterableStatement; @@ -85,6 +89,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.ViewType; @@ -258,6 +263,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (range != null) { scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst())); scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond())); + ScanUtil.setQualifierRangesOnFilter(scan, range); } } if (optimizeProjection) { @@ -340,6 +346,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>(); Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); int referencedCfCount = familyMap.size(); + QualifierEncodingScheme encodingScheme = table.getEncodingScheme(); + ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme(); + BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null; boolean filteredColumnNotInProjection = false; for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { byte[] filteredFamily = whereCol.getFirst(); @@ -380,6 +389,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result cols = new TreeSet<ImmutableBytesPtr>(); for (byte[] q : qs) { cols.add(new ImmutableBytesPtr(q)); + if (trackedColumnsBitset != null) { + int qualifier = encodingScheme.decode(q); + trackedColumnsBitset.set(qualifier); + } } } columnsTracker.put(cf, cols); @@ -428,7 +441,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // in the scan in this case. We still want the other optimization that causes // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { - ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), + ScanUtil.andFilterAtEnd(scan, + trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme()))); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java index efe7da2..fb6baf0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java @@ -195,4 +195,11 @@ public class EncodedColumnsUtil { } return number < QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; } + + public static boolean isPossibleToUseEncodedCQFilter(QualifierEncodingScheme encodingScheme, + ImmutableStorageScheme storageScheme) { + return EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) + && storageScheme == ImmutableStorageScheme.ONE_CELL_PER_COLUMN; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index c9e76c5..1fdc73b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableComparator; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.ScanRanges; @@ -54,6 +55,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.filter.BooleanExpressionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; +import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -267,6 +269,21 @@ public class ScanUtil { scan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,Arrays.asList(filter, andWithFilter))); } } + + public static void setQualifierRangesOnFilter(Scan scan, Pair<Integer, Integer> minMaxQualifiers) { + Filter filter = scan.getFilter(); + if (filter != null) { + if (filter instanceof FilterList) { + for (Filter f : ((FilterList)filter).getFilters()) { + if (f instanceof MultiEncodedCQKeyValueComparisonFilter) { + ((MultiEncodedCQKeyValueComparisonFilter)f).setMinMaxQualifierRange(minMaxQualifiers); + } + } + } else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) { + ((MultiEncodedCQKeyValueComparisonFilter)filter).setMinMaxQualifierRange(minMaxQualifiers); + } + } + } public static void setTimeRange(Scan scan, long ts) { try { @@ -908,5 +925,5 @@ public class ScanUtil { public static boolean isIndexRebuild(Scan scan) { return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null; } - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 12d98f0..c8f2276 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -59,6 +59,7 @@ import org.apache.phoenix.expression.aggregator.CountAggregator; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.expression.function.TimeUnit; import org.apache.phoenix.filter.ColumnProjectionFilter; +import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; @@ -2341,7 +2342,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan); while (iterator.hasNext()) { Filter filter = iterator.next(); - if (filter instanceof ColumnProjectionFilter) { + if (filter instanceof EncodedQualifiersColumnProjectionFilter) { return true; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/test/java/org/apache/phoenix/compile/SelectStatementRewriterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/SelectStatementRewriterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/SelectStatementRewriterTest.java index f7b6ffd..9da5d9d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/SelectStatementRewriterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/SelectStatementRewriterTest.java @@ -17,10 +17,11 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.and; import static org.apache.phoenix.util.TestUtil.constantComparison; -import static org.apache.phoenix.util.TestUtil.multiKVFilter; +import static org.apache.phoenix.util.TestUtil.multiEncodedKVFilter; import static org.apache.phoenix.util.TestUtil.singleKVFilter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -79,14 +80,14 @@ public class SelectStatementRewriterTest extends BaseConnectionlessQueryTest { String query = "select * from atable where organization_id='" + tenantId + "' and a_integer=0 and a_string='foo'"; Filter filter = compileStatement(query); assertEquals( - multiKVFilter(and( + multiEncodedKVFilter(and( constantComparison( CompareOp.EQUAL, A_INTEGER, 0), constantComparison( CompareOp.EQUAL, A_STRING, "foo") - )), + ), FOUR_BYTE_QUALIFIERS), filter); } @@ -103,14 +104,14 @@ public class SelectStatementRewriterTest extends BaseConnectionlessQueryTest { String query = "select * from atable where a_integer=0 and a_string='foo'"; Filter filter = compileStatement(query); assertEquals( - multiKVFilter(and( + multiEncodedKVFilter(and( constantComparison( CompareOp.EQUAL, A_INTEGER, 0), constantComparison( CompareOp.EQUAL, A_STRING, "foo") - )), + ), FOUR_BYTE_QUALIFIERS), filter); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java index e10b940..a295ad6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS; import static org.apache.phoenix.util.TestUtil.ATABLE_NAME; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.and; @@ -25,7 +26,7 @@ import static org.apache.phoenix.util.TestUtil.bindParams; import static org.apache.phoenix.util.TestUtil.columnComparison; import static org.apache.phoenix.util.TestUtil.constantComparison; import static org.apache.phoenix.util.TestUtil.in; -import static org.apache.phoenix.util.TestUtil.multiKVFilter; +import static org.apache.phoenix.util.TestUtil.multiEncodedKVFilter; import static org.apache.phoenix.util.TestUtil.not; import static org.apache.phoenix.util.TestUtil.or; import static org.apache.phoenix.util.TestUtil.singleKVFilter; @@ -262,10 +263,10 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { Scan scan = plan.getContext().getScan(); Filter filter = scan.getFilter(); assertEquals( - multiKVFilter(columnComparison( + multiEncodedKVFilter(columnComparison( CompareOp.EQUAL, A_STRING, - B_STRING)), + B_STRING), FOUR_BYTE_QUALIFIERS), filter); } @@ -297,7 +298,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { Filter filter = scan.getFilter(); assertEquals( - multiKVFilter(and( + multiEncodedKVFilter(and( constantComparison( CompareOp.EQUAL, A_INTEGER, @@ -305,7 +306,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { constantComparison( CompareOp.EQUAL, A_STRING, - "foo"))), + "foo")), FOUR_BYTE_QUALIFIERS), filter); } @@ -944,7 +945,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { Expression aInteger = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_INTEGER").getPosition()).newColumnExpression(); Expression aString = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_STRING").getPosition()).newColumnExpression(); assertEquals( - multiKVFilter(and( + multiEncodedKVFilter(and( constantComparison( CompareOp.EQUAL, aInteger, @@ -952,7 +953,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { constantComparison( CompareOp.EQUAL, aString, - "foo"))), + "foo")), FOUR_BYTE_QUALIFIERS), filter); byte[] startRow = PVarchar.INSTANCE.toBytes(tenantId + tenantTypeId); @@ -978,7 +979,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { Expression aInteger = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_INTEGER").getPosition()).newColumnExpression(); Expression aString = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_STRING").getPosition()).newColumnExpression(); assertEquals( - multiKVFilter(and( + multiEncodedKVFilter(and( constantComparison( CompareOp.EQUAL, aInteger, @@ -986,7 +987,7 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest { constantComparison( CompareOp.EQUAL, aString, - "foo"))), + "foo")), FOUR_BYTE_QUALIFIERS), filter); byte[] startRow = PVarchar.INSTANCE.toBytes(tenantId); http://git-wip-us.apache.org/repos/asf/phoenix/blob/452155ce/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index ead712b..c427c8d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -91,6 +91,7 @@ import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.expression.function.SubstrFunction; import org.apache.phoenix.expression.function.SumAggregateFunction; import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter; +import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.MultiKeyValueComparisonFilter; import org.apache.phoenix.filter.RowKeyComparisonFilter; import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter; @@ -109,6 +110,7 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PLongColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -361,6 +363,10 @@ public class TestUtil { public static MultiKeyValueComparisonFilter multiKVFilter(Expression e) { return new MultiCQKeyValueComparisonFilter(e); } + + public static MultiEncodedCQKeyValueComparisonFilter multiEncodedKVFilter(Expression e, QualifierEncodingScheme encodingScheme) { + return new MultiEncodedCQKeyValueComparisonFilter(e, encodingScheme); + } public static Expression and(Expression... expressions) { return new AndExpression(Arrays.asList(expressions));
