http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java new file mode 100644 index 0000000..14b488d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.BaseTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SchemaUtil; + +public class TupleProjector { + public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v"); + public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0]; + + private static final String SCAN_PROJECTOR = "scanProjector"; + + private final KeyValueSchema schema; + private final Expression[] expressions; + private ValueBitSet valueSet; + private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + + public TupleProjector(RowProjector rowProjector) { + List<? extends ColumnProjector> columnProjectors = rowProjector.getColumnProjectors(); + int count = columnProjectors.size(); + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + expressions = new Expression[count]; + for (int i = 0; i < count; i++) { + Expression expression = columnProjectors.get(i).getExpression(); + builder.addField(expression); + expressions[i] = expression; + } + schema = builder.build(); + valueSet = ValueBitSet.newInstance(schema); + } + + public TupleProjector(ProjectedPTableWrapper projected) { + List<PColumn> columns = projected.getTable().getColumns(); + expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()]; + // we do not count minNullableIndex for we might do later merge. + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + int i = 0; + for (PColumn column : projected.getTable().getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + builder.addField(column); + expressions[i++] = projected.getSourceExpression(column); + } + } + schema = builder.build(); + valueSet = ValueBitSet.newInstance(schema); + } + + private TupleProjector(KeyValueSchema schema, Expression[] expressions) { + this.schema = schema; + this.expressions = expressions; + this.valueSet = ValueBitSet.newInstance(schema); + } + + public void setValueBitSet(ValueBitSet bitSet) { + this.valueSet = bitSet; + } + + public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + projector.schema.write(output); + int count = projector.expressions.length; + WritableUtils.writeVInt(output, count); + for (int i = 0; i < count; i++) { + WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal()); + projector.expressions[i].write(output); + } + scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + public static TupleProjector deserializeProjectorFromScan(Scan scan) { + byte[] proj = scan.getAttribute(SCAN_PROJECTOR); + if (proj == null) { + return null; + } + ByteArrayInputStream stream = new ByteArrayInputStream(proj); + try { + DataInputStream input = new DataInputStream(stream); + KeyValueSchema schema = new KeyValueSchema(); + schema.readFields(input); + int count = WritableUtils.readVInt(input); + Expression[] expressions = new Expression[count]; + for (int i = 0; i < count; i++) { + int ordinal = WritableUtils.readVInt(input); + expressions[i] = ExpressionType.values()[ordinal].newInstance(); + expressions[i].readFields(input); + } + return new TupleProjector(schema, expressions); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class ProjectedValueTuple extends BaseTuple { + private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(); + private long timestamp; + private byte[] projectedValue; + private int bitSetLen; + private KeyValue keyValue; + + private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) { + this.keyPtr.set(keyBuffer, keyOffset, keyLength); + this.timestamp = timestamp; + this.projectedValue = projectedValue; + this.bitSetLen = bitSetLen; + } + + public ImmutableBytesWritable getKeyPtr() { + return keyPtr; + } + + public long getTimestamp() { + return timestamp; + } + + public byte[] getProjectedValue() { + return projectedValue; + } + + public int getBitSetLength() { + return bitSetLen; + } + + @Override + public void getKey(ImmutableBytesWritable ptr) { + ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength()); + } + + @Override + public KeyValue getValue(int index) { + if (index != 0) { + throw new IndexOutOfBoundsException(Integer.toString(index)); + } + return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER); + } + + @Override + public KeyValue getValue(byte[] family, byte[] qualifier) { + if (keyValue == null) { + keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), + VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length); + } + return keyValue; + } + + @Override + public boolean getValue(byte[] family, byte[] qualifier, + ImmutableBytesWritable ptr) { + ptr.set(projectedValue); + return true; + } + + @Override + public boolean isImmutable() { + return true; + } + + @Override + public int size() { + return 1; + } + } + + public ProjectedValueTuple projectResults(Tuple tuple) { + byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr); + KeyValue base = tuple.getValue(0); + return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength()); + } + + public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException { + boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr); + if (!b) + throw new IOException("Trying to decode a non-projected value."); + } + + public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet, + Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException { + ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue()); + destBitSet.clear(); + destBitSet.or(destValue); + int origDestBitSetLen = dest.getBitSetLength(); + ImmutableBytesWritable srcValue = new ImmutableBytesWritable(); + decodeProjectedValue(src, srcValue); + srcBitSet.clear(); + srcBitSet.or(srcValue); + int origSrcBitSetLen = srcBitSet.getEstimatedLength(); + for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) { + if (srcBitSet.get(i)) { + destBitSet.set(offset + i); + } + } + int destBitSetLen = destBitSet.getEstimatedLength(); + byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen]; + int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen); + o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen); + destBitSet.toBytes(merged, o); + ImmutableBytesWritable keyPtr = dest.getKeyPtr(); + return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen); + } + + public KeyValueSchema getSchema() { + return schema; + } + + public Expression[] getExpressions() { + return expressions; + } + + public ValueBitSet getValueBitSet() { + return valueSet; + } + + @Override + public String toString() { + return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}"; + } +} +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java index dcac849..10657e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java @@ -22,8 +22,8 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.visitor.ExpressionVisitor; -import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.PColumn; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java index fefb077..2af99ca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.schema.PArrayDataType; import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.schema.PhoenixArray; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; @@ -37,15 +36,15 @@ public class DistinctValueClientAggregator extends DistinctValueWithCountClientA @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { - if (buffer == null || buffer.length == 0) { + if (cachedResult == null) { Object[] values = new Object[valueVsCount.size()]; int i = 0; for (ImmutableBytesPtr key : valueVsCount.keySet()) { values[i++] = valueType.toObject(key, sortOrder); } - PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values); - buffer = resultType.toBytes(array, sortOrder); + cachedResult = PArrayDataType.instantiatePhoenixArray(valueType, values); } + buffer = resultType.toBytes(cachedResult, sortOrder); ptr.set(buffer); return true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java new file mode 100644 index 0000000..8fd36b3 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; + +/** + * + * Base class for result scanners that aggregate the row count value for rows with + * duplicate keys. This result scanner assumes that the results of the inner result + * scanner are returned in order of grouping keys. + * + */ +public abstract class BaseGroupedAggregatingResultIterator implements + AggregatingResultIterator { + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + protected final PeekingResultIterator resultIterator; + protected final Aggregators aggregators; + private ImmutableBytesWritable currentKey; + private ImmutableBytesWritable nextKey; + + public BaseGroupedAggregatingResultIterator( + PeekingResultIterator resultIterator, Aggregators aggregators) { + if (resultIterator == null) throw new NullPointerException(); + if (aggregators == null) throw new NullPointerException(); + this.resultIterator = resultIterator; + this.aggregators = aggregators; + this.currentKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + this.nextKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + } + + protected abstract ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException; + protected abstract Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException; + + @Override + public Tuple next() throws SQLException { + Tuple result = resultIterator.next(); + if (result == null) { + return null; + } + if (currentKey.get() == UNITIALIZED_KEY_BUFFER) { + getGroupingKey(result, currentKey); + } + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + while (true) { + aggregators.aggregate(rowAggregators, result); + Tuple nextResult = resultIterator.peek(); + if (nextResult == null || !currentKey.equals(getGroupingKey(nextResult, nextKey))) { + break; + } + result = resultIterator.next(); + } + + byte[] value = aggregators.toBytes(rowAggregators); + Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + currentKey.set(nextKey.get(), nextKey.getOffset(), nextKey.getLength()); + return tuple; + } + + @Override + public void close() throws SQLException { + resultIterator.close(); + } + + @Override + public void aggregate(Tuple result) { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + aggregators.aggregate(rowAggregators, result); + } + + @Override + public void explain(List<String> planSteps) { + resultIterator.explain(planSteps); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java index 50e1bc2..db08696 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java @@ -17,19 +17,13 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.query.QueryConstants.*; - import java.sql.SQLException; -import java.util.List; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - -import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.KeyValueUtil; -import org.apache.phoenix.util.TupleUtil; @@ -51,54 +45,20 @@ import org.apache.phoenix.util.TupleUtil; * * @since 0.1 */ -public class GroupedAggregatingResultIterator implements AggregatingResultIterator { - private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); - private final PeekingResultIterator resultIterator; - protected final Aggregators aggregators; - - public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) { - if (resultIterator == null) throw new NullPointerException(); - if (aggregators == null) throw new NullPointerException(); - this.resultIterator = resultIterator; - this.aggregators = aggregators; - } - - @Override - public Tuple next() throws SQLException { - Tuple result = resultIterator.next(); - if (result == null) { - return null; - } - Aggregator[] rowAggregators = aggregators.getAggregators(); - aggregators.reset(rowAggregators); - while (true) { - aggregators.aggregate(rowAggregators, result); - Tuple nextResult = resultIterator.peek(); - if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) { - break; - } - result = resultIterator.next(); - } - - byte[] value = aggregators.toBytes(rowAggregators); - result.getKey(tempPtr); - return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); - } - - @Override - public void close() throws SQLException { - resultIterator.close(); +public class GroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator { + + public GroupedAggregatingResultIterator(PeekingResultIterator resultIterator, Aggregators aggregators) { + super(resultIterator, aggregators); } @Override - public void aggregate(Tuple result) { - Aggregator[] rowAggregators = aggregators.getAggregators(); - aggregators.reset(rowAggregators); - aggregators.aggregate(rowAggregators, result); + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException { + tuple.getKey(ptr); + return ptr; } @Override - public void explain(List<String> planSteps) { - resultIterator.explain(planSteps); + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException { + return new SingleKeyValueTuple(keyValue); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index a7f390f..3293f65 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -25,7 +25,11 @@ import org.apache.phoenix.schema.tuple.Tuple; abstract public class LookAheadResultIterator implements PeekingResultIterator { - public static LookAheadResultIterator wrap(final ResultIterator iterator) { + public static PeekingResultIterator wrap(final ResultIterator iterator) { + if (iterator instanceof PeekingResultIterator) { + return (PeekingResultIterator) iterator; + } + return new LookAheadResultIterator() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java deleted file mode 100644 index 8377b03..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.join; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.WritableUtils; -import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper; -import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.expression.ExpressionType; -import org.apache.phoenix.schema.KeyValueSchema; -import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.ValueBitSet; -import org.apache.phoenix.schema.tuple.BaseTuple; -import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.KeyValueUtil; -import org.apache.phoenix.util.SchemaUtil; - -public class TupleProjector { - public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v"); - public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0]; - - private static final String SCAN_PROJECTOR = "scanProjector"; - - private final KeyValueSchema schema; - private final Expression[] expressions; - private ValueBitSet valueSet; - private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - - public TupleProjector(ProjectedPTableWrapper projected) { - List<PColumn> columns = projected.getTable().getColumns(); - expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()]; - // we do not count minNullableIndex for we might do later merge. - KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); - int i = 0; - for (PColumn column : projected.getTable().getColumns()) { - if (!SchemaUtil.isPKColumn(column)) { - builder.addField(column); - expressions[i++] = projected.getSourceExpression(column); - } - } - schema = builder.build(); - valueSet = ValueBitSet.newInstance(schema); - } - - private TupleProjector(KeyValueSchema schema, Expression[] expressions) { - this.schema = schema; - this.expressions = expressions; - this.valueSet = ValueBitSet.newInstance(schema); - } - - public void setValueBitSet(ValueBitSet bitSet) { - this.valueSet = bitSet; - } - - public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - try { - DataOutputStream output = new DataOutputStream(stream); - projector.schema.write(output); - int count = projector.expressions.length; - WritableUtils.writeVInt(output, count); - for (int i = 0; i < count; i++) { - WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal()); - projector.expressions[i].write(output); - } - scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - try { - stream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - } - - public static TupleProjector deserializeProjectorFromScan(Scan scan) { - byte[] proj = scan.getAttribute(SCAN_PROJECTOR); - if (proj == null) { - return null; - } - ByteArrayInputStream stream = new ByteArrayInputStream(proj); - try { - DataInputStream input = new DataInputStream(stream); - KeyValueSchema schema = new KeyValueSchema(); - schema.readFields(input); - int count = WritableUtils.readVInt(input); - Expression[] expressions = new Expression[count]; - for (int i = 0; i < count; i++) { - int ordinal = WritableUtils.readVInt(input); - expressions[i] = ExpressionType.values()[ordinal].newInstance(); - expressions[i].readFields(input); - } - return new TupleProjector(schema, expressions); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - try { - stream.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - public static class ProjectedValueTuple extends BaseTuple { - private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(); - private long timestamp; - private byte[] projectedValue; - private int bitSetLen; - private KeyValue keyValue; - - private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) { - this.keyPtr.set(keyBuffer, keyOffset, keyLength); - this.timestamp = timestamp; - this.projectedValue = projectedValue; - this.bitSetLen = bitSetLen; - } - - public ImmutableBytesWritable getKeyPtr() { - return keyPtr; - } - - public long getTimestamp() { - return timestamp; - } - - public byte[] getProjectedValue() { - return projectedValue; - } - - public int getBitSetLength() { - return bitSetLen; - } - - @Override - public void getKey(ImmutableBytesWritable ptr) { - ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength()); - } - - @Override - public KeyValue getValue(int index) { - if (index != 0) { - throw new IndexOutOfBoundsException(Integer.toString(index)); - } - return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER); - } - - @Override - public KeyValue getValue(byte[] family, byte[] qualifier) { - if (keyValue == null) { - keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), - VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length); - } - return keyValue; - } - - @Override - public boolean getValue(byte[] family, byte[] qualifier, - ImmutableBytesWritable ptr) { - ptr.set(projectedValue); - return true; - } - - @Override - public boolean isImmutable() { - return true; - } - - @Override - public int size() { - return 1; - } - } - - public ProjectedValueTuple projectResults(Tuple tuple) { - byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr); - KeyValue base = tuple.getValue(0); - return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength()); - } - - public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException { - boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr); - if (!b) - throw new IOException("Trying to decode a non-projected value."); - } - - public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet, - Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException { - ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue()); - destBitSet.clear(); - destBitSet.or(destValue); - int origDestBitSetLen = dest.getBitSetLength(); - ImmutableBytesWritable srcValue = new ImmutableBytesWritable(); - decodeProjectedValue(src, srcValue); - srcBitSet.clear(); - srcBitSet.or(srcValue); - int origSrcBitSetLen = srcBitSet.getEstimatedLength(); - for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) { - if (srcBitSet.get(i)) { - destBitSet.set(offset + i); - } - } - int destBitSetLen = destBitSet.getEstimatedLength(); - byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen]; - int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen); - o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen); - destBitSet.toBytes(merged, o); - ImmutableBytesWritable keyPtr = dest.getKeyPtr(); - return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen); - } - - @Override - public String toString() { - return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}"; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 132c831..c7bc944 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -101,6 +101,7 @@ public class QueryOptimizer { if (!useIndexes || select.isJoin() || dataPlan.getContext().getResolver().getTables().size() > 1 + || select.getInnerSelectStatement() != null || (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) { return Collections.singletonList(dataPlan); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 2242cd0..5aaf04d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -629,8 +629,8 @@ public class ParseNodeFactory { statement.hasSequence()); } - public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) { - return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), select, where, groupBy, + public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) { + return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy, statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate, statement.hasSequence()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java index 6cee588..e7302dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java @@ -192,4 +192,11 @@ public class SelectStatement implements FilterableStatement { public boolean isJoin() { return fromTable.size() > 1 || (fromTable.size() > 0 && fromTable.get(0) instanceof JoinTableNode); } + + public SelectStatement getInnerSelectStatement() { + if (fromTable.size() != 1 || !(fromTable.get(0) instanceof DerivedTableNode)) + return null; + + return ((DerivedTableNode) fromTable.get(0)).getSelect(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java index 0910712..f1a0028 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java @@ -114,7 +114,7 @@ public final class ColumnRef { return new KeyValueColumnExpression(column, displayName); } - if (table.getType() == PTableType.JOIN) { + if (table.getType() == PTableType.JOIN || table.getType() == PTableType.SUBQUERY) { return new ProjectedColumnExpression(column, table, column.getName().getString()); }
