http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java index 212d0ba,0000000..966d552 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java @@@ -1,535 -1,0 +1,538 @@@ +package org.apache.phoenix.calcite; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.StructKind; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.CoerceExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PDatum; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.ProjectedColumn; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; ++import org.apache.phoenix.schema.PTable.EncodedCQCounter; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.ByteUtil; ++import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class TableMapping { + private final TableRef tableRef; + private final TableRef dataTableRef; + private final List<PColumn> mappedColumns; + private final int extendedColumnsOffset; + private final TableRef extendedTableRef; + // For column resolving + private final List<String> names = Lists.newArrayList(); + private final Map<String, Map<String, Integer>> groupMap = Maps.newHashMap(); + + public TableMapping(PTable table) { + this.tableRef = new TableRef(table); + this.dataTableRef = null; + this.mappedColumns = getMappedColumns(table); + this.extendedColumnsOffset = mappedColumns.size(); + this.extendedTableRef = null; + init(); + } + + public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean extend) throws SQLException { + this.tableRef = tableRef; + this.dataTableRef = dataTableRef; + if (!extend) { + this.mappedColumns = getMappedColumns(tableRef.getTable()); + this.extendedColumnsOffset = mappedColumns.size(); + this.extendedTableRef = null; + } else { + this.mappedColumns = Lists.newArrayList(); + this.mappedColumns.addAll(getMappedColumns(tableRef.getTable())); + this.extendedColumnsOffset = mappedColumns.size(); + final PTable dataTable = dataTableRef.getTable(); + final List<PColumn> projectedColumns = getDataTableMappedColumns(dataTableRef, mappedColumns); + this.mappedColumns.addAll(projectedColumns); ++ EncodedCQCounter cqCounter = EncodedCQCounter.NULL_COUNTER; ++ if (EncodedColumnsUtil.usesEncodedColumnNames(dataTable)) { ++ cqCounter = EncodedCQCounter.copy(dataTable.getEncodedCQCounter()); ++ } + PTable extendedTable = PTableImpl.makePTable(dataTable.getTenantId(), + TupleProjectionCompiler.PROJECTED_TABLE_SCHEMA, dataTable.getName(), + PTableType.PROJECTED, null, dataTable.getTimeStamp(), + dataTable.getSequenceNumber(), dataTable.getPKName(), null, + projectedColumns, null, null, Collections.<PTable>emptyList(), + dataTable.isImmutableRows(), Collections.<PName>emptyList(), null, null, + dataTable.isWALDisabled(), false, dataTable.getStoreNulls(), + dataTable.getViewType(), null, null, dataTable.rowKeyOrderOptimizable(), + dataTable.isTransactional(), dataTable.getUpdateCacheFrequency(), + dataTable.getIndexDisableTimestamp(), dataTable.isNamespaceMapped(), - dataTable.getAutoPartitionSeqName(), dataTable.isAppendOnlySchema()); ++ dataTable.getAutoPartitionSeqName(), dataTable.isAppendOnlySchema(), ++ dataTable.getImmutableStorageScheme(), dataTable.getEncodingScheme(), cqCounter); + this.extendedTableRef = new TableRef(extendedTable); + } + init(); + } + + private void init() { + Set<String> nameSet = Sets.newHashSet(); + boolean dup = false; + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + String familyName = column.getFamilyName() == null ? "" : column.getFamilyName().getString(); + String name = column.getName().getString(); + Map<String, Integer> subMap = groupMap.get(familyName); + if (subMap == null) { + subMap = Maps.newHashMap(); + groupMap.put(familyName, subMap); + } + subMap.put(name, i); + dup = dup || !nameSet.add(name); + } + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + String familyName = column.getFamilyName() == null ? "" : column.getFamilyName().getString(); + String name = column.getName().getString(); + String translatedName = !dup ? name + : SchemaUtil.getCaseSensitiveColumnDisplayName(familyName, column.getName().getString()); + names.add(translatedName); + } + } + + public TableRef getTableRef() { + return tableRef; + } + + public PTable getPTable() { + return tableRef.getTable(); + } + + public TableRef getDataTableRef() { + return dataTableRef; + } + + public List<String> getColumnNames() { + return names; + } + + public List<PColumn> getMappedColumns() { + return mappedColumns; + } + + public boolean hasExtendedColumns() { + return extendedTableRef != null; + } + + public List<Pair<RelDataTypeField, List<String>>> resolveColumn( + RelDataType rowType, RelDataTypeFactory typeFactory, List<String> names) { + List<Pair<RelDataTypeField, List<String>>> ret = new ArrayList<>(); + if (names.size() >= 2) { + Map<String, Integer> subMap = groupMap.get(names.get(0)); + if (subMap != null) { + Integer index = subMap.get(names.get(1)); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), + names.subList(2, names.size()))); + } + } + } + + final String columnName = names.get(0); + final List<String> remainder = names.subList(1, names.size()); + for (int i = 0; i < this.names.size(); i++) { + if (columnName.equals(this.names.get(i))) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(i), remainder)); + return ret; + } + } + + final List<String> priorityGroups = Arrays.asList("", QueryConstants.DEFAULT_COLUMN_FAMILY); + for (String group : priorityGroups) { + Map<String, Integer> subMap = groupMap.get(group); + if (subMap != null) { + Integer index = subMap.get(columnName); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), remainder)); + return ret; + } + } + } + for (Map.Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) { + if (priorityGroups.contains(entry.getKey())) { + continue; + } + Integer index = entry.getValue().get(columnName); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), remainder)); + } + } + + if (ret.isEmpty() && names.size() == 1) { + Map<String, Integer> subMap = groupMap.get(columnName); + if (subMap != null) { + List<Map.Entry<String, Integer>> entries = + new ArrayList<>(subMap.entrySet()); + Collections.sort( + entries, + new Comparator<Map.Entry<String, Integer>>() { + @Override public int compare( + Entry<String, Integer> o1, Entry<String, Integer> o2) { + return o1.getValue() - o2.getValue(); + } + }); + ret.add( + new Pair<RelDataTypeField, List<String>>( + new RelDataTypeFieldImpl( + columnName, -1, + createStructType( + rowType, + typeFactory, + entries)), + remainder)); + } + } + + return ret; + } + + private static RelDataType createStructType( + final RelDataType rowType, + RelDataTypeFactory typeFactory, + final List<Map.Entry<String, Integer>> entries) { + return typeFactory.createStructType( + StructKind.PEEK_FIELDS, + new AbstractList<RelDataType>() { + @Override public RelDataType get(int index) { + final int i = entries.get(index).getValue(); + return rowType.getFieldList().get(i).getType(); + } + @Override public int size() { + return entries.size(); + } + }, + new AbstractList<String>() { + @Override public String get(int index) { + return entries.get(index).getKey(); + } + @Override public int size() { + return entries.size(); + } + }); + } + + public Expression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef( + index < extendedColumnsOffset ? tableRef : extendedTableRef, + this.mappedColumns.get(index).getPosition()); + try { + return colRef.newColumnExpression(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public ImmutableBitSet getDefaultExtendedColumnRef() { + return ImmutableBitSet.range(extendedColumnsOffset, mappedColumns.size()); + } + + public ImmutableBitSet getExtendedColumnRef(List<RexNode> exprs) { + if (!hasExtendedColumns()) { + return ImmutableBitSet.of(); + } + + ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); + for (RexNode expr : exprs) { + builder.addAll(InputFinder.analyze(expr).inputBitSet.build()); + } + for (int i = 0; i < extendedColumnsOffset; i++) { + builder.clear(i); + } + return builder.build(); + } + + public org.apache.hadoop.hbase.util.Pair<Integer, Integer> getExtendedColumnReferenceCount(ImmutableBitSet columnRef) { + Set<String> cf = Sets.newHashSet(); + int columnCount = 0; + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + if (columnRef.get(i)) { + PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i)) + .getSourceColumnRef().getColumn(); + cf.add(dataColumn.getFamilyName().getString()); + columnCount++; + } + } + return new org.apache.hadoop.hbase.util.Pair<Integer, Integer>(cf.size(), columnCount); + } + + public PTable createProjectedTable(boolean retainPKColumns) { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + List<PColumn> columns = retainPKColumns ? + tableRef.getTable().getColumns() : mappedColumns.subList(0, extendedColumnsOffset); + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(tableRef, column.getPosition())); + } + if (extendedColumnsOffset < mappedColumns.size()) { + for (PColumn column : mappedColumns.subList(extendedColumnsOffset, mappedColumns.size())) { + sourceColumnRefs.add(new ColumnRef(extendedTableRef, column.getPosition())); + } + } + + try { + return TupleProjectionCompiler.createProjectedTable(tableRef, sourceColumnRefs, retainPKColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public TupleProjector createTupleProjector(boolean retainPKColumns) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + if (!SchemaUtil.isPKColumn(mappedColumns.get(i)) || !retainPKColumns) { + Expression expr = newColumnExpression(i); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + public RowProjector createRowProjector() throws SQLException { + return createRowProjector(null); + } + + public RowProjector createRowProjector(List<PColumn> targetColumns) throws SQLException { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. + if (targetColumns != null) { + PDatum targetColumn = targetColumns.get(i); + if (targetColumn.getDataType() != expr.getDataType()) { + PDataType<?> targetType = targetColumn.getDataType(); + assert expr.getDataType() == null || expr.getDataType().isCastableTo(targetType); + expr = CoerceExpression.create(expr, targetType, targetColumn.getSortOrder(), targetColumn.getMaxLength()); + } + } + columnProjectors.add(new ExpressionProjector(column.getName().getString(), tableRef.getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + public void setupScanForExtendedTable(Scan scan, ImmutableBitSet extendedColumnRef, + PhoenixConnection connection) throws SQLException { + if (extendedTableRef == null || extendedColumnRef.isEmpty()) { + return; + } + + TableRef dataTableRef = null; + List<PColumn> dataColumns = Lists.newArrayList(); + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + ProjectedColumn column = (ProjectedColumn) mappedColumns.get(i); + builder.addField(column); + if (extendedColumnRef.get(i)) { + dataColumns.add(column.getSourceColumnRef().getColumn()); + exprs.add(column.getSourceColumnRef().newColumnExpression()); + if (dataTableRef == null) { + dataTableRef = column.getSourceColumnRef().getTableRef(); + } + } else { + exprs.add(LiteralExpression.newConstant(null)); + } + } + if (dataColumns.isEmpty()) { + return; + } + + // Set data columns to be join back from data table. + serializeDataTableColumnsToJoin(scan, dataColumns); + // Set tuple projector of the data columns. + TupleProjector projector = new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + TupleProjector.serializeProjectorIntoScan(scan, projector, IndexUtil.INDEX_PROJECTOR); + PTable dataTable = dataTableRef.getTable(); + // Set index maintainer of the local index. + serializeIndexMaintainerIntoScan(scan, dataTable, connection); + // Set view constants if exists. + serializeViewConstantsIntoScan(scan, dataTable); + } + + private static void serializeDataTableColumnsToJoin(Scan scan, List<PColumn> dataColumns) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, dataColumns.size()); + for (PColumn column : dataColumns) { + Bytes.writeByteArray(output, column.getFamilyName().getBytes()); + Bytes.writeByteArray(output, column.getName().getBytes()); + } + scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable, PhoenixConnection connection) throws SQLException { + PName name = getPTable().getName(); + List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); + for (PTable index : dataTable.getIndexes()) { + if (index.getName().equals(name) && index.getIndexType() == IndexType.LOCAL) { + indexes.add(index); + break; + } + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(dataTable, ptr, indexes, connection); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } + } + + private static void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) { + int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0); + int nViewConstants = 0; + if (dataTable.getType() == PTableType.VIEW) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + List<PColumn> dataPkColumns = dataTable.getPKColumns(); + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPKColumn = dataPkColumns.get(i); + if (dataPKColumn.getViewConstant() != null) { + nViewConstants++; + } + } + if (nViewConstants > 0) { + byte[][] viewConstants = new byte[nViewConstants][]; + int j = 0; + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPkColumn = dataPkColumns.get(i); + if (dataPkColumn.getViewConstant() != null) { + if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) { + viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } else { + throw new IllegalStateException(); + } + } + } + serializeViewConstantsIntoScan(viewConstants, scan); + } + } + } + + private static void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, viewConstants.length); + for (byte[] viewConstant : viewConstants) { + Bytes.writeByteArray(output, viewConstant); + } + scan.setAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static List<PColumn> getMappedColumns(PTable pTable) { + int initPosition = + (pTable.getBucketNum() ==null ? 0 : 1) + + (pTable.isMultiTenant() ? 1 : 0) + + (pTable.getViewIndexId() == null ? 0 : 1); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(pTable.getColumns().size() - initPosition); - for (int i = initPosition; i < pTable.getPKColumns().size(); i++) { - columns.add(pTable.getPKColumns().get(i)); - } - for (PColumnFamily family : pTable.getColumnFamilies()) { - for (PColumn column : family.getColumns()) { - columns.add(column); - } ++ for (int i = initPosition; i < pTable.getColumns().size(); i++) { ++ columns.add(pTable.getColumns().get(i)); + } + + return columns; + } + + private static List<PColumn> getDataTableMappedColumns(TableRef dataTableRef, List<PColumn> mappedColumns) { + Set<String> names = Sets.newHashSet(); + for (PColumn column : mappedColumns) { + names.add(column.getName().getString()); + } + List<PColumn> projectedColumns = new ArrayList<PColumn>(); + for (PColumnFamily cf : dataTableRef.getTable().getColumnFamilies()) { + for (PColumn sourceColumn : cf.getColumns()) { + String colName = IndexUtil.getIndexColumnName(sourceColumn); + if (!names.contains(colName)) { + ColumnRef sourceColumnRef = + new ColumnRef(dataTableRef, sourceColumn.getPosition()); + PColumn column = new ProjectedColumn(PNameFactory.newName(colName), + cf.getName(), projectedColumns.size(), - sourceColumn.isNullable(), sourceColumnRef); ++ sourceColumn.isNullable(), sourceColumnRef, ++ sourceColumn.getColumnQualifierBytes()); + projectedColumns.add(column); + } + } + } + + return projectedColumns; + } +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index 6f0c806,0000000..c4a7d49 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@@ -1,141 -1,0 +1,145 @@@ +package org.apache.phoenix.calcite.rel; + ++import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; ++import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; ++ +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.SequenceValueExpression; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.BindParameterExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; ++import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.types.PDataType; +import com.google.common.collect.Lists; + +public class PhoenixRelImplementorImpl implements PhoenixRelImplementor { + private final StatementContext statementContext; + private final RuntimeContext runtimeContext; + private Stack<ImplementorContext> contextStack; + private TableMapping tableMapping; + + public PhoenixRelImplementorImpl( + StatementContext statementContext, RuntimeContext runtimeContext) { + this.statementContext = statementContext; + this.runtimeContext = runtimeContext; + this.contextStack = new Stack<ImplementorContext>(); + } + + @Override + public QueryPlan visitInput(int i, PhoenixQueryRel input) { + return input.implement(this); + } + + @Override + public Expression newColumnExpression(int index) { + return tableMapping.newColumnExpression(index); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newBindParameterExpression(int index, PDataType type, Integer maxLength) { + return new BindParameterExpression(index, type, maxLength, runtimeContext); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + Expression fieldAccessExpr = runtimeContext.getCorrelateVariable(variableId).newExpression(index); + return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); + } + + @Override + public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) { + PName tenantName = seq.pc.getTenantId(); + TableName tableName = TableName.create(seq.schemaName, seq.sequenceName); + try { + return statementContext.getSequenceManager().newSequenceReference(tenantName, tableName, null, op); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public StatementContext getStatementContext() { + return statementContext; + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @Override + public void setTableMapping(TableMapping tableMapping) { + this.tableMapping = tableMapping; + } + + @Override + public TableMapping getTableMapping() { + return this.tableMapping; + } + + @Override + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); + } + + @Override + public ImplementorContext popContext() { + return contextStack.pop(); + } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); + } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = exprs.get(i); + builder.addField(expr); - columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), ++ columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(QueryConstants.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), - i, expr.getSortOrder(), null, null, false, name, false, false)); ++ i, expr.getSortOrder(), null, null, false, name, false, false, null)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, true, false, 0, 0, false, null, false); ++ null, null, true, false, 0, 0, false, null, false, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); + this.setTableMapping(new TableMapping(pTable)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index f09de7c,0000000..335facf mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@@ -1,325 -1,0 +1,323 @@@ +package org.apache.phoenix.calcite.rel; + +import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; + +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare.CatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.StatementPlan; +import org.apache.phoenix.compile.UpsertCompiler; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PRow; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class PhoenixTableModify extends TableModify implements PhoenixRel { + + public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, CatalogReader catalogReader, RelNode child, + Operation operation, List<String> updateColumnList, List<RexNode> sourceExpressionList, + boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, + updateColumnList, sourceExpressionList, flattened); + assert operation == Operation.INSERT || operation == Operation.DELETE; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new PhoenixTableModify( + getCluster(), + traitSet, + getTable(), + getCatalogReader(), + sole(inputs), + getOperation(), + getUpdateColumnList(), + getSourceExpressionList(), + isFlattened()); + } + + @Override + public StatementPlan implement(PhoenixRelImplementor implementor) { + final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class); + final PhoenixConnection connection = targetTable.pc; + final TableRef targetTableRef = new TableRef(targetTable.tableMapping.getTableRef()); + + final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input); + RowProjector projector; + try { + final List<PColumn> targetColumns = + getOperation() == Operation.DELETE + ? null : targetTable.tableMapping.getMappedColumns(); + projector = implementor.getTableMapping().createRowProjector(targetColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (getOperation() == Operation.INSERT) { + return upsert(connection, targetTable, targetTableRef, queryPlan, projector); + } + + // delete + return delete(connection, targetTable, targetTableRef, queryPlan, projector); + } + + private static MutationPlan upsert(final PhoenixConnection connection, + final PhoenixTable targetTable, final TableRef targetTableRef, + final QueryPlan queryPlan, final RowProjector projector) { + try (PhoenixStatement stmt = new PhoenixStatement(connection)) { + final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef); + final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + + // TODO TenantId, ViewIndexId, UpdatableViewColumns + final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns(); + final int[] columnIndexes = new int[mappedColumns.size()]; + final int[] pkSlotIndexes = new int[mappedColumns.size()]; ++ int pkColPosition = -1; + for (int i = 0; i < columnIndexes.length; i++) { + PColumn column = mappedColumns.get(i); - int pkColPosition = 0; + if (SchemaUtil.isPKColumn(column)) { - for(PColumn col: mappedColumns) { - if(col.equals(column)) break; - // Since first columns in the mappedColumns are pk columns only. - pkColPosition++; ++ if (pkColPosition == -1) { ++ pkColPosition = column.getPosition(); + } - pkSlotIndexes[i] = pkColPosition; ++ pkSlotIndexes[i] = pkColPosition++; + } + columnIndexes[i] = column.getPosition(); + } + // TODO + final boolean useServerTimestamp = false; + + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return queryPlan.getContext().getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return targetTableRef; + } + + @Override + public Set<TableRef> getSourceRefs() { + // TODO return originalQueryPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } + + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + // simplest version, no run-on-server, no pipelined update + return UpsertCompiler.upsertSelect(context, targetTableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, true); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT SELECT"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static MutationPlan delete(final PhoenixConnection connection, + final PhoenixTable targetTable, final TableRef targetTableRef, + final QueryPlan queryPlan, final RowProjector projector) { + final StatementContext context = queryPlan.getContext(); + // TODO + final boolean deleteFromImmutableIndexToo = false; + return new MutationPlan() { + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public TableRef getTargetRef() { + return targetTableRef; + } + + @Override + public Set<TableRef> getSourceRefs() { + // TODO dataPlan.getSourceRefs(); + return queryPlan.getSourceRefs(); + } + + @Override + public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() { + return org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE; + } + + @Override + public MutationState execute() throws SQLException { + ResultIterator iterator = queryPlan.iterator(); + try { + // TODO hasLimit?? + return deleteRows(context, targetTableRef, deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, projector, queryPlan.getTableRef()); + } finally { + iterator.close(); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("DELETE ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } + + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + PTable table = targetTableRef.getTable(); + PhoenixStatement statement = childContext.getStatement(); + PhoenixConnection connection = statement.getConnection(); + PName tenantId = connection.getTenantId(); + byte[] tenantIdBytes = null; + if (tenantId != null) { + tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null); + } + final boolean isAutoCommit = connection.getAutoCommit(); + ConnectionQueryServices services = connection.getQueryServices(); + final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize); + Map<ImmutableBytesPtr,RowMutationState> indexMutations = null; + // If indexTableRef is set, we're deleting the rows from both the index table and + // the data table through a single query to save executing an additional one. + if (indexTableRef != null) { + indexMutations = Maps.newHashMapWithExpectedSize(batchSize); + } + List<PColumn> pkColumns = table.getPKColumns(); + boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null; + boolean isSharedViewIndex = table.getViewIndexId() != null; + int offset = (table.getBucketNum() == null ? 0 : 1); + byte[][] values = new byte[pkColumns.size()][]; + if (isMultiTenant) { + values[offset++] = tenantIdBytes; + } + if (isSharedViewIndex) { + values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId()); + } + try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { + int rowCount = 0; + while (rs.next()) { + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + // Use tuple directly, as projector would not have all the PK columns from + // our index table inside of our projection. Since the tables are equal, + // there's no transation required. + if (sourceTableRef.equals(targetTableRef)) { + rs.getCurrentRow().getKey(ptr); + } else { + for (int i = offset; i < values.length; i++) { + byte[] byteValue = rs.getBytes(i+1-offset); + // The ResultSet.getBytes() call will have inverted it - we need to invert it back. + // TODO: consider going under the hood and just getting the bytes + if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) { + byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length); + byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length); + } + values[i] = byteValue; + } + table.newKey(ptr, values); + } + // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the + // row key will already have its value. + mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + if (indexTableRef != null) { + ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map + rs.getCurrentRow().getKey(indexPtr); + indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + } + if (mutations.size() > maxSize) { + throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); + } + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection); + connection.getMutationState().join(state); + if (indexTableRef != null) { + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + connection.getMutationState().join(indexState); + } + connection.getMutationState().send(); + mutations.clear(); + if (indexMutations != null) { + indexMutations.clear(); + } + } + } + + // If auto commit is true, this last batch will be committed upon return + int nCommittedRows = rowCount / batchSize * batchSize; + MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection); + if (indexTableRef != null) { + // To prevent the counting of these index rows, we have a negative for remainingRows. + MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection); + state.join(indexState); + } + return state; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 234ac5d,0000000..e50538b mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@@ -1,367 -1,0 +1,367 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; +import java.util.Objects; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.iterate.BaseResultIterators; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixTableScan extends TableScan implements PhoenixQueryRel { + public enum ScanOrder { + NONE, + FORWARD, + REVERSE, + } + + public final RexNode filter; + public final ScanOrder scanOrder; + public final ScanRanges scanRanges; + public final ImmutableBitSet extendedColumnRef; + + protected final Long estimatedBytes; + protected final float rowCountFactor; + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table) { + return create(cluster, table, null, + getDefaultScanOrder(table.unwrap(PhoenixTable.class)), null); + } + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table, + RexNode filter, final ScanOrder scanOrder, ImmutableBitSet extendedColumnRef) { + final RelTraitSet traits = + cluster.traitSetOf(PhoenixConvention.SERVER) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + if (scanOrder == ScanOrder.NONE) { + return ImmutableList.of(); + } + List<RelCollation> collations = table.getCollationList(); + return scanOrder == ScanOrder.FORWARD ? collations : reverse(collations); + } + }); + return new PhoenixTableScan(cluster, traits, table, filter, scanOrder, extendedColumnRef); + } + + private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, RexNode filter, ScanOrder scanOrder, + ImmutableBitSet extendedColumnRef) { + super(cluster, traits, table); + this.filter = filter; + this.scanOrder = scanOrder; + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + this.rowCountFactor = phoenixTable.pc.getQueryServices() + .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f); + try { + // TODO simplify this code + TableMapping tableMapping = phoenixTable.tableMapping; + PTable pTable = tableMapping.getPTable(); + SelectStatement select = SelectStatement.SELECT_ONE; + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + if (extendedColumnRef == null) { + extendedColumnRef = tableMapping.getDefaultExtendedColumnRef(); + } + this.extendedColumnRef = extendedColumnRef; + if (filter != null) { + assert this.extendedColumnRef.contains( + tableMapping.getExtendedColumnRef(ImmutableList.of(filter))); + // We use a implementor with a special implementation for correlate variables + // or bind parameters here, which translates them into a LiteralExpression + // with a sample value. This will achieve 3 goals at a time: + // 1) avoid getting exception when translating RexFieldAccess at this time when + // the correlate variable has not been defined yet. + // 2) get a guess of ScanRange even if the runtime value is absent. + // TODO instead of getting a random sample value, we'd better get it from + // existing guidepost bytes. + // 3) test whether this dynamic filter is worth a recompile at runtime. + PhoenixRelImplementor tmpImplementor = new PhoenixRelImplementorImpl( + context, RuntimeContext.EMPTY_CONTEXT) { + @SuppressWarnings("rawtypes") + @Override + public Expression newBindParameterExpression(int index, PDataType type, Integer maxLength) { + try { + return LiteralExpression.newConstant(type.getSampleValue(maxLength), type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + try { + return LiteralExpression.newConstant(type.getSampleValue(), type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }; + tmpImplementor.setTableMapping(tableMapping); + Expression filterExpr = CalciteUtils.toExpression(filter, tmpImplementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + } + this.scanRanges = context.getScanRanges(); + // TODO Get estimated byte count based on column reference list. + this.estimatedBytes = BaseResultIterators.getEstimatedCount(context, pTable).getSecond(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static ScanOrder getDefaultScanOrder(PhoenixTable table) { + //TODO why attribute value not correct in connectUsingModel?? + //return table.pc.getQueryServices().getProps().getBoolean( + // QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, + // QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER) ? + // ScanOrder.FORWARD : ScanOrder.NONE; + return ScanOrder.NONE; + } + + private static List<RelCollation> reverse(List<RelCollation> collations) { + Builder<RelCollation> builder = ImmutableList.<RelCollation>builder(); + for (RelCollation collation : collations) { + builder.add(CalciteUtils.reverseCollation(collation)); + } + return builder.build(); + } + + public boolean isReverseScanEnabled() { + return table.unwrap(PhoenixTable.class).pc + .getQueryServices().getProps().getBoolean( + QueryServices.USE_REVERSE_SCAN_ATTRIB, + QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("filter", filter, filter != null) + .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE) + .itemIf("extendedColumns", extendedColumnRef, !extendedColumnRef.isEmpty()); + } + + @Override public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof PhoenixTableScan)) { + return false; + } + + PhoenixTableScan other = (PhoenixTableScan) obj; + return this.table.equals(other.table) + && Objects.equals(this.filter, other.filter) + && this.scanOrder == other.scanOrder + && this.extendedColumnRef.equals(other.extendedColumnRef); + } + + @Override public int hashCode() { + return table.hashCode(); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + double byteCount; + PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + if (estimatedBytes != null) { + byteCount = estimatedBytes; + } else { + // If stats are not available, we estimate based on selectivity. + int pkCount = scanRanges.getBoundPkColumnCount(); + if (pkCount > 0) { + byteCount = phoenixTable.byteCount * Math.pow(mq.getSelectivity(this, filter), pkCount); + } else { + byteCount = phoenixTable.byteCount; + } + } + Pair<Integer, Integer> columnRefCount = + phoenixTable.tableMapping.getExtendedColumnReferenceCount(extendedColumnRef); + double extendedColumnMultiplier = 1 + columnRefCount.getFirst() * 10 + columnRefCount.getSecond() * 0.1; + byteCount *= extendedColumnMultiplier; + byteCount *= rowCountFactor; + if (scanOrder != ScanOrder.NONE) { + // We don't want to make a big difference here. The idea is to avoid + // forcing row key order whenever the order is absolutely useless. + // E.g. in "select count(*) from t" we do not need the row key order; + // while in "select * from t order by pk0" we should force row key + // order to avoid sorting. + // Another case is "select pk0, count(*) from t", where we'd like to + // choose the row key ordered TableScan rel so that the Aggregate rel + // above it can be an stream aggregate, although at runtime this will + // eventually be an AggregatePlan, in which the "forceRowKeyOrder" + // flag takes no effect. + byteCount = addEpsilon(byteCount); + if (scanOrder == ScanOrder.REVERSE) { + byteCount = addEpsilon(byteCount); + } + } + return planner.getCostFactory() + .makeCost(byteCount + 1, byteCount + 1, 0) + .multiplyBy(0.5) /* data scan only */ + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + double rows = super.estimateRowCount(mq); + if (filter != null && !filter.isAlwaysTrue()) { + rows = rows * mq.getSelectivity(this, filter); + } + + return rows * rowCountFactor; + } + + @Override + public List<RelCollation> getCollationList() { + return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + } + + @Override + public QueryPlan implement(PhoenixRelImplementor implementor) { + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + TableMapping tableMapping = phoenixTable.tableMapping; + implementor.setTableMapping(tableMapping); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_ONE; + ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; + Expression filterExpr = LiteralExpression.newConstant(Boolean.TRUE); + Expression dynamicFilter = null; + if (filter != null) { + ImmutableBitSet bitSet = InputFinder.analyze(filter).inputBitSet.addAll(columnRefList).build(); + columnRefList = ImmutableIntList.copyOf(bitSet.asList()); + filterExpr = CalciteUtils.toExpression(filter, implementor); + } + Expression rem = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, rem, true, false); + // TODO This is not absolutely strict. We may have a filter like: + // pk = '0' and pk = $cor0 where $cor0 happens to get a sample value + // as '0', thus making the below test return false and adding an + // unnecessary dynamic filter. This would only be a performance bug though. + if (filter != null && !context.getScanRanges().equals(this.scanRanges)) { + dynamicFilter = filterExpr; + } + tableMapping.setupScanForExtendedTable(context.getScan(), extendedColumnRef, context.getConnection()); + projectColumnFamilies(context.getScan(), tableMapping.getMappedColumns(), columnRefList); + if (implementor.getCurrentContext().forceProject) { + boolean retainPKColumns = implementor.getCurrentContext().retainPKColumns; + TupleProjector tupleProjector = tableMapping.createTupleProjector(retainPKColumns); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = tableMapping.createProjectedTable(retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); + } + OrderBy orderBy = scanOrder == ScanOrder.NONE ? + OrderBy.EMPTY_ORDER_BY + : (scanOrder == ScanOrder.FORWARD ? + OrderBy.FWD_ROW_KEY_ORDER_BY + : OrderBy.REV_ROW_KEY_ORDER_BY); + ParallelIteratorFactory iteratorFactory = null; + TableRef tableRef = tableMapping.getTableRef(); + TableRef srcRef = tableMapping.getDataTableRef() == null ? + tableRef : tableMapping.getDataTableRef(); + // FIXME this is just a temporary fix for schema caching problem. + tableRef.setTimeStamp(QueryConstants.UNSET_TIMESTAMP); + srcRef.setTimeStamp(QueryConstants.UNSET_TIMESTAMP); + return new ScanPlan(context, select, tableRef, srcRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, iteratorFactory, true, dynamicFilter); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void projectColumnFamilies(Scan scan, List<PColumn> mappedColumns, ImmutableIntList columnRefList) { + scan.getFamilyMap().clear(); + for (Integer index : columnRefList) { + PColumn column = mappedColumns.get(index); + PName familyName = column.getFamilyName(); + if (familyName != null) { - scan.addFamily(familyName.getBytes()); ++ scan.addColumn(familyName.getBytes(), column.getColumnQualifierBytes()); + } + } + } + + private double addEpsilon(double d) { + assert d >= 0d; + final double d0 = d; + if (d < 10) { + // For small d, adding 1 would change the value significantly. + d *= 1.001d; + if (d != d0) { + return d; + } + } + // For medium d, add 1. Keeps integral values integral. + ++d; + if (d != d0) { + return d; + } + // For large d, adding 1 might not change the value. Add .1%. + // If d is NaN, this still will probably not change the value. That's OK. + d *= 1.001d; + return d; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index bbc995c,32e9f68..bbbbc8f --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@@ -16,6 -16,7 +16,9 @@@ * limitations under the License. */ package org.apache.phoenix.compile; + import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; ++import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; ++import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import java.sql.SQLException; import java.util.ArrayList; @@@ -143,7 -145,7 +147,7 @@@ public class TupleProjectionCompiler for (LocalIndexDataColumnRef sourceColumnRef : visitor.localIndexColumnRefSet) { PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), sourceColumnRef.getColumn().getFamilyName(), position++, - sourceColumnRef.getColumn().isNullable(), sourceColumnRef); - sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes()); ++ sourceColumnRef.getColumn().isNullable(), sourceColumnRef, null); projectedColumns.add(column); } @@@ -154,9 -156,9 +158,9 @@@ null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); ++ table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); } - + public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { PTable table = tableRef.getTable(); boolean hasSaltingColumn = retainPKColumns && table.getBucketNum() != null; @@@ -169,20 -171,23 +173,23 @@@ String aliasedName = tableRef.getTableAlias() == null ? SchemaUtil.getColumnName(table.getName().getString(), colName) : SchemaUtil.getColumnName(tableRef.getTableAlias(), colName); - - PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), - retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? - null : PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), - position++, sourceColumn.isNullable(), sourceColumnRef); + PName familyName = SchemaUtil.isPKColumn(sourceColumn) ? (retainPKColumns ? null : PNameFactory.newName(VALUE_COLUMN_FAMILY)) : sourceColumn.getFamilyName(); + PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), familyName, + position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); } + EncodedCQCounter cqCounter = EncodedCQCounter.NULL_COUNTER; + if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { + cqCounter = EncodedCQCounter.copy(table.getEncodedCQCounter()); + } + return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED, - null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), - retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, - null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, - table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(), - retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), - retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, - Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), - table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(), ++ retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, ++ null, Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, ++ table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(), ++ retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable(), table.isTransactional(), + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java index c31cb0a,19404f0..63a82f7 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/CoerceExpression.java @@@ -155,6 -155,6 +155,11 @@@ public class CoerceExpression extends B } @Override ++ public boolean isStateless() { ++ return getChild().isStateless(); ++ } ++ ++ @Override public PDataType getDataType() { return toType; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/BaseExpressionVisitor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/BaseExpressionVisitor.java index 5d8e451,d79b546..9b14f6c --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/BaseExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/BaseExpressionVisitor.java @@@ -35,8 -35,8 +35,9 @@@ import org.apache.phoenix.expression.Mo import org.apache.phoenix.expression.MultiplyExpression; import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; +import org.apache.phoenix.expression.ReinterpretCastExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; + import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.StringConcatExpression; import org.apache.phoenix.expression.SubtractExpression; import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java index c7771f9,e47fb64..bf7c083 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java @@@ -40,9 -39,10 +40,11 @@@ import org.apache.phoenix.expression.Mu import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.expression.ReinterpretCastExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; + import org.apache.phoenix.expression.SingleCellColumnExpression; + import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.StringConcatExpression; import org.apache.phoenix.expression.SubtractExpression; import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java index 2d270f3,5936dc7..79a450b --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java @@@ -23,8 -23,8 +23,9 @@@ import java.util.List import org.apache.phoenix.compile.SequenceValueExpression; import org.apache.phoenix.expression.AddExpression; import org.apache.phoenix.expression.AndExpression; + import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.ArrayConstructorExpression; +import org.apache.phoenix.expression.BindParameterExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; @@@ -41,9 -41,9 +42,10 @@@ import org.apache.phoenix.expression.Mu import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.expression.ReinterpretCastExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; + import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.StringConcatExpression; import org.apache.phoenix.expression.SubtractExpression; import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; @@@ -114,8 -111,10 +116,11 @@@ public interface ExpressionVisitor<E> public Iterator<Expression> visitEnter(ArrayConstructorExpression node); public E visitLeave(ArrayConstructorExpression node, List<E> l); + public Iterator<Expression> visitEnter(SingleCellConstructorExpression node); + public E visitLeave(SingleCellConstructorExpression node, List<E> l); + public E visit(CorrelateVariableFieldAccessExpression node); + public E visit(BindParameterExpression node); public E visit(LiteralExpression node); public E visit(RowKeyColumnExpression node); public E visit(KeyValueColumnExpression node); http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java index 2c0f4e1,f5615be..bdf5253 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java @@@ -39,9 -38,10 +39,11 @@@ import org.apache.phoenix.expression.Mu import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.expression.ReinterpretCastExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; + import org.apache.phoenix.expression.SingleCellColumnExpression; + import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.StringConcatExpression; import org.apache.phoenix.expression.SubtractExpression; import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java index f108c46,7f447b3..61bee9d --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java @@@ -39,9 -38,10 +39,11 @@@ import org.apache.phoenix.expression.Mu import org.apache.phoenix.expression.NotExpression; import org.apache.phoenix.expression.OrExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.expression.ReinterpretCastExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; + import org.apache.phoenix.expression.SingleCellColumnExpression; + import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.StringConcatExpression; import org.apache.phoenix.expression.SubtractExpression; import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 9ccc397,3e2c9b5..db7fac8 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@@ -440,19 -454,20 +455,25 @@@ public class IndexUtil public static TupleProjector getTupleProjector(Scan scan, ColumnReference[] dataColumns) { if (dataColumns != null && dataColumns.length != 0) { + TupleProjector projector = TupleProjector.deserializeProjectorFromScan(scan, INDEX_PROJECTOR); + if (projector != null) { + return projector; + } + KeyValueSchema keyValueSchema = deserializeLocalIndexJoinSchemaFromScan(scan); - KeyValueColumnExpression[] keyValueColumns = new KeyValueColumnExpression[dataColumns.length]; + boolean storeColsInSingleCell = scan.getAttribute(BaseScannerRegionObserver.COLUMNS_STORED_IN_SINGLE_CELL) != null; + QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + Expression[] colExpressions = storeColsInSingleCell ? new SingleCellColumnExpression[dataColumns.length] : new KeyValueColumnExpression[dataColumns.length]; for (int i = 0; i < dataColumns.length; i++) { - ColumnReference dataColumn = dataColumns[i]; - KeyValueColumnExpression dataColumnExpr = new KeyValueColumnExpression(keyValueSchema.getField(i), dataColumn.getFamily(), dataColumn.getQualifier()); - keyValueColumns[i] = dataColumnExpr; + byte[] family = dataColumns[i].getFamily(); + byte[] qualifier = dataColumns[i].getQualifier(); + Field field = keyValueSchema.getField(i); + Expression dataColumnExpr = + storeColsInSingleCell ? new SingleCellColumnExpression(field, family, qualifier, encodingScheme) + : new KeyValueColumnExpression(field, family, qualifier); + colExpressions[i] = dataColumnExpr; } - return new TupleProjector(keyValueSchema, keyValueColumns); + return new TupleProjector(keyValueSchema, colExpressions); } return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ----------------------------------------------------------------------
