http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index 658b4cc..3d178f6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -104,11 +104,11 @@ public class TenantCacheImpl implements TenantCache { } @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException { + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException { MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, txState, chunk); + Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer); getServerCaches().put(cacheId, element); success = true; return element;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index 07df105..b482998 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -38,6 +38,7 @@ import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -239,7 +240,7 @@ public class CreateTableCompiler { } } - private static class ViewWhereExpressionVisitor extends StatelessTraverseNoExpressionVisitor<Boolean> { + public static class ViewWhereExpressionVisitor extends StatelessTraverseNoExpressionVisitor<Boolean> { private boolean isUpdatable = true; private final PTable table; private int position; @@ -318,13 +319,18 @@ public class CreateTableCompiler { @Override public Boolean visit(KeyValueColumnExpression node) { try { - this.position = table.getColumnFamily(node.getColumnFamily()).getColumn(node.getColumnName()).getPosition(); + this.position = table.getColumnFamily(node.getColumnFamily()).getPColumnForColumnQualifier(node.getColumnQualifier()).getPosition(); } catch (SQLException e) { throw new RuntimeException(e); // Impossible } return Boolean.TRUE; } + @Override + public Boolean visit(SingleCellColumnExpression node) { + return visit(node.getKeyValueExpression()); + } + } private static class VarbinaryDatum implements PDatum { http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 602cd6b..cee545a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -585,7 +585,7 @@ public class DeleteCompiler { if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - context.getScan().setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java index c05918b..fb4c542 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java @@ -117,12 +117,26 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; -import org.apache.phoenix.schema.types.*; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PUnsignedTimestamp; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.schema.types.PhoenixArray; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -386,7 +400,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio } protected void addColumn(PColumn column) { - context.getScan().addColumn(column.getFamilyName().getBytes(), column.getName().getBytes()); + EncodedColumnsUtil.setColumns(column, context.getCurrentTable().getTable(), context.getScan()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 8e4d9aa..f401aad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; + import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Map; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.expression.Expression; @@ -71,6 +74,8 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; @@ -125,10 +130,12 @@ public class FromCompiler { throw new ColumnNotFoundException(schemaName, tableName, null, colName); } + @Override public PFunction resolveFunction(String functionName) throws SQLException { throw new FunctionNotFoundException(functionName); } + @Override public boolean hasUDFs() { return false; } @@ -257,7 +264,7 @@ public class FromCompiler { Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression(); PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(), sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), - column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic()); + column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifierBytes()); projectedColumns.add(projectedColumn); } PTable t = PTableImpl.makePTable(table, projectedColumns); @@ -332,26 +339,28 @@ public class FromCompiler { private final String alias; private final List<PSchema> schemas; - public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean isNamespaceMapped) throws SQLException { - super(connection, 0, false, udfParseNodes); - List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); - for (ColumnDef def : table.getDynamicColumns()) { - if (def.getColumnDefName().getFamilyName() != null) { - families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList())); - } + public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean isNamespaceMapped) throws SQLException { + super(connection, 0, false, udfParseNodes); + List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); + for (ColumnDef def : table.getDynamicColumns()) { + if (def.getColumnDefName().getFamilyName() != null) { + families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList()));//, NON_ENCODED_QUALIFIERS)); + } } Long scn = connection.getSCN(); String schema = table.getName().getSchemaName(); if (connection.getSchema() != null) { schema = schema != null ? schema : connection.getSchema(); } - PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), + // Storage scheme and encoding scheme don't matter here since the PTable is being used only for the purposes of create table. + // The actual values of these two will be determined by the metadata client. + PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), scn == null ? HConstants.LATEST_TIMESTAMP : scn, families, isNamespaceMapped); - theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); - alias = null; - tableRefs = ImmutableList.of(new TableRef(alias, theTable, timeStamp, !table.getDynamicColumns().isEmpty())); - schemas = ImmutableList.of(new PSchema(theTable.getSchemaName().toString(), timeStamp)); - } + theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); + alias = null; + tableRefs = ImmutableList.of(new TableRef(alias, theTable, timeStamp, !table.getDynamicColumns().isEmpty())); + schemas = ImmutableList.of(new PSchema(theTable.getSchemaName().toString(), timeStamp)); + } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1)); @@ -447,8 +456,8 @@ public class FromCompiler { } PColumn column = resolveCF - ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName) - : tableRef.getTable().getColumn(colName); + ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) + : tableRef.getTable().getColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } @@ -672,7 +681,7 @@ public class FromCompiler { familyName = PNameFactory.newName(family); } allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(), - dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true)); + dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true, Bytes.toBytes(dynColumn.getColumnDefName().getColumnName()))); position++; } theTable = PTableImpl.makePTable(theTable, allcolumns); @@ -774,16 +783,17 @@ public class FromCompiler { // referenced by an outer wild-card select. alias = String.valueOf(position); } + PName name = PNameFactory.newName(alias); PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias), PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY), - null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false); + null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false, name.getBytes()); columns.add(column); } PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false, false, null, null, null, false, false, 0, 0L, SchemaUtil - .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false); + .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); @@ -858,7 +868,7 @@ public class FromCompiler { while (iterator.hasNext()) { TableRef tableRef = iterator.next(); try { - PColumn column = tableRef.getTable().getColumn(colName); + PColumn column = tableRef.getTable().getColumnForColumnName(colName); if (theTableRef != null) { throw new AmbiguousColumnException(colName); } theTableRef = tableRef; theColumnPosition = column.getPosition(); @@ -871,12 +881,12 @@ public class FromCompiler { } else { try { TableRef tableRef = resolveTable(schemaName, tableName); - PColumn column = tableRef.getTable().getColumn(colName); + PColumn column = tableRef.getTable().getColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } catch (TableNotFoundException e) { // Try using the tableName as a columnFamily reference instead ColumnFamilyRef cfRef = resolveColumnFamily(schemaName, tableName); - PColumn column = cfRef.getFamily().getColumn(colName); + PColumn column = cfRef.getFamily().getPColumnForColumnName(colName); return new ColumnRef(cfRef.getTableRef(), column.getPosition()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index e8c05ca..eef604b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; + import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -76,6 +79,8 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; @@ -93,6 +98,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -714,7 +720,7 @@ public class JoinCompiler { if (columnRef.getTableRef().equals(tableRef) && !SchemaUtil.isPKColumn(columnRef.getColumn()) && !(columnRef instanceof LocalIndexColumnRef)) { - scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), columnRef.getColumn().getName().getBytes()); + EncodedColumnsUtil.setColumns(columnRef.getColumn(), tableRef.getTable(), scan); } } } @@ -1284,7 +1290,7 @@ public class JoinCompiler { if (type == JoinType.Full) { for (PColumn c : left.getColumns()) { merged.add(new ProjectedColumn(c.getName(), c.getFamilyName(), - c.getPosition(), true, ((ProjectedColumn) c).getSourceColumnRef())); + c.getPosition(), true, ((ProjectedColumn) c).getSourceColumnRef(), SchemaUtil.isPKColumn(c) ? null : c.getName().getBytes())); } } else { merged.addAll(left.getColumns()); @@ -1294,14 +1300,13 @@ public class JoinCompiler { if (!SchemaUtil.isPKColumn(c)) { PColumn column = new ProjectedColumn(c.getName(), c.getFamilyName(), position++, type == JoinType.Inner ? c.isNullable() : true, - ((ProjectedColumn) c).getSourceColumnRef()); + ((ProjectedColumn) c).getSourceColumnRef(), c.getName().getBytes()); merged.add(column); } } if (left.getBucketNum() != null) { merged.remove(0); } - return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(), PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), @@ -1310,7 +1315,7 @@ public class JoinCompiler { left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), - left.getAutoPartitionSeqName(), left.isAppendOnlySchema()); + left.getAutoPartitionSeqName(), left.isAppendOnlySchema(), ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 2df0671..8265de8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -57,6 +57,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -77,10 +78,11 @@ public class ListJarsQueryPlan implements QueryPlan { static { List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>(); + PName colName = PNameFactory.newName("jar_location"); PColumn column = - new PColumnImpl(PNameFactory.newName("jar_location"), null, + new PColumnImpl(colName, null, PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false, false); + false, null, false, false, colName.getBytes()); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression = http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 393499a..0b3de6e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -50,6 +50,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.TransactionUtil; @@ -176,8 +177,8 @@ public class PostDDLCompiler { @Override public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { PColumn column = tableName != null - ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName) - : tableRef.getTable().getColumn(colName); + ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName) + : tableRef.getTable().getColumnForColumnName(colName); return new ColumnRef(tableRef, column.getPosition()); } @@ -213,6 +214,7 @@ public class PostDDLCompiler { ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts); if (emptyCF != null) { scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst()); } ServerCache cache = null; try { @@ -236,11 +238,12 @@ public class PostDDLCompiler { // data empty column family to stay the same, while the index empty column family // changes. PColumn column = deleteList.get(0); + byte[] cq = column.getColumnQualifierBytes(); if (emptyCF == null) { - scan.addColumn(column.getFamilyName().getBytes(), column.getName().getBytes()); + scan.addColumn(column.getFamilyName().getBytes(), cq); } scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes()); - scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, column.getName().getBytes()); + scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq); } } List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index 81dbe0d..7e3c3b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -31,6 +31,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -77,12 +78,16 @@ public class PostLocalIndexDDLCompiler { // rows per region as a result. The value of the attribute will be our persisted // index maintainers. // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver - scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr)); // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). // However, in this case, we need to project all of the data columns that contribute to the index. IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { - scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + if (index.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + scan.addFamily(columnRef.getFamily()); + } else { + scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + } } // Go through MutationPlan abstraction so that we can create local indexes http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 99a9731..200b06c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -24,11 +24,9 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -44,14 +42,11 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; -import org.apache.phoenix.expression.aggregator.ClientAggregators; -import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.function.ArrayIndexFunction; -import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.expression.visitor.ExpressionVisitor; import org.apache.phoenix.expression.visitor.ProjectedColumnExpressionVisitor; import org.apache.phoenix.expression.visitor.ReplaceArrayFunctionExpressionVisitor; -import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.BindParseNode; @@ -78,6 +73,7 @@ import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; @@ -92,9 +88,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SizedUtil; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** @@ -217,7 +211,7 @@ public class ProjectionCompiler { PColumn indexColumn = null; ColumnRef ref = null; try { - indexColumn = index.getColumn(indexColName); + indexColumn = index.getColumnForColumnName(indexColName); ref = new ColumnRef(tableRef, indexColumn.getPosition()); } catch (ColumnNotFoundException e) { if (index.getIndexType() == IndexType.LOCAL) { @@ -289,7 +283,7 @@ public class ProjectionCompiler { ColumnRef ref = null; String indexColumnFamily = null; try { - indexColumn = index.getColumn(indexColName); + indexColumn = index.getColumnForColumnName(indexColName); ref = new ColumnRef(tableRef, indexColumn.getPosition()); indexColumnFamily = indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString(); } catch (ColumnNotFoundException e) { @@ -484,11 +478,13 @@ public class ProjectionCompiler { } } else { for (byte[] cq : entry.getValue()) { - PColumn column = family.getColumn(cq); - Integer maxLength = column.getMaxLength(); - int byteSize = column.getDataType().isFixedWidth() ? maxLength == null ? column.getDataType().getByteSize() : maxLength : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE; - estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + byteSize; - } + //if (!Bytes.equals(cq, ByteUtil.EMPTY_BYTE_ARRAY) || cq.length > 0) { + PColumn column = family.getPColumnForColumnQualifier(cq); + Integer maxLength = column.getMaxLength(); + int byteSize = column.getDataType().isFixedWidth() ? maxLength == null ? column.getDataType().getByteSize() : maxLength : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE; + estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + byteSize; + } + //} } } boolean isProjectEmptyKeyValue = false; @@ -663,7 +659,14 @@ public class ProjectionCompiler { public Void visit(ProjectedColumnExpression expression) { if (expression.getDataType().isArrayType()) { indexProjectedColumns.add(expression); - KeyValueColumnExpression keyValueColumnExpression = new KeyValueColumnExpression(expression.getColumn()); + PColumn col = expression.getColumn(); + PTable table = context.getCurrentTable().getTable(); + KeyValueColumnExpression keyValueColumnExpression; + if (table.getImmutableStorageScheme() != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + keyValueColumnExpression = new SingleCellColumnExpression(col, col.getName().getString(), table.getEncodingScheme()); + } else { + keyValueColumnExpression = new KeyValueColumnExpression(col); + } indexKVs.add(keyValueColumnExpression); copyOfChildren.set(0, keyValueColumnExpression); Integer count = arrayExpressionCounts.get(expression); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index ed5cda9..8fb435d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -51,6 +51,7 @@ import org.apache.phoenix.parse.TraceStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -75,10 +76,11 @@ public class TraceQueryPlan implements QueryPlan { private static final RowProjector TRACE_PROJECTOR; static { List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>(); + PName colName = PNameFactory.newName(MetricInfo.TRACE.columnName); PColumn column = new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null, PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false, false); + false, null, false, false, colName.getBytes()); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression = http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 4d3c0cf..32e9f68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.phoenix.compile; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import java.sql.SQLException; import java.util.ArrayList; @@ -24,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.FamilyWildcardParseNode; @@ -43,11 +43,13 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; @@ -120,7 +122,7 @@ public class TupleProjectionCompiler { PColumn sourceColumn = table.getPKColumns().get(i); ColumnRef sourceColumnRef = new ColumnRef(tableRef, sourceColumn.getPosition()); PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), - position++, sourceColumn.isNullable(), sourceColumnRef); + position++, sourceColumn.isNullable(), sourceColumnRef, null); projectedColumns.add(column); } for (PColumn sourceColumn : table.getColumns()) { @@ -132,18 +134,18 @@ public class TupleProjectionCompiler { && !families.contains(sourceColumn.getFamilyName().getString())) continue; PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), - position++, sourceColumn.isNullable(), sourceColumnRef); + position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); // Wildcard or FamilyWildcard will be handled by ProjectionCompiler. if (!isWildcard && !families.contains(sourceColumn.getFamilyName())) { - context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), sourceColumn.getName().getBytes()); + EncodedColumnsUtil.setColumns(column, table, context.getScan()); } } // add LocalIndexDataColumnRef for (LocalIndexDataColumnRef sourceColumnRef : visitor.localIndexColumnRefSet) { PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), sourceColumnRef.getColumn().getFamilyName(), position++, - sourceColumnRef.getColumn().isNullable(), sourceColumnRef); + sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes()); projectedColumns.add(column); } @@ -154,9 +156,9 @@ public class TupleProjectionCompiler { null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), - table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); } - + public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { PTable table = tableRef.getTable(); boolean hasSaltingColumn = retainPKColumns && table.getBucketNum() != null; @@ -169,20 +171,23 @@ public class TupleProjectionCompiler { String aliasedName = tableRef.getTableAlias() == null ? SchemaUtil.getColumnName(table.getName().getString(), colName) : SchemaUtil.getColumnName(tableRef.getTableAlias(), colName); - - PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), - retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? - null : PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), - position++, sourceColumn.isNullable(), sourceColumnRef); + PName familyName = SchemaUtil.isPKColumn(sourceColumn) ? (retainPKColumns ? null : PNameFactory.newName(VALUE_COLUMN_FAMILY)) : sourceColumn.getFamilyName(); + PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), familyName, + position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); } + EncodedCQCounter cqCounter = EncodedCQCounter.NULL_COUNTER; + if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { + cqCounter = EncodedCQCounter.copy(table.getEncodedCQCounter()); + } + return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED, null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index bc3466c..e5e18e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -39,6 +39,8 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; @@ -84,11 +86,12 @@ public class UnionCompiler { for (int i = 0; i < plan.getProjector().getColumnCount(); i++) { ColumnProjector colProj = plan.getProjector().getColumnProjector(i); String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias(); + PName colName = PNameFactory.newName(name); PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME, targetTypes.get(i).getType(), targetTypes.get(i).getMaxLength(), targetTypes.get(i).getScale(), colProj.getExpression().isNullable(), i, targetTypes.get(i).getSortOrder(), 500, null, false, - colProj.getExpression().toString(), false, false); + colProj.getExpression().toString(), false, false, colName.getBytes()); projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); @@ -98,7 +101,7 @@ public class UnionCompiler { null, null, projectedColumns, null, null, null, true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L, SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY, - statement.getConnection().getQueryServices().getProps()), null, false); + statement.getConnection().getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 18070d4..7a285a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -748,7 +748,7 @@ public class UpsertCompiler { if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get()); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); @@ -917,10 +917,10 @@ public class UpsertCompiler { UpdateColumnCompiler compiler = new UpdateColumnCompiler(context); int nColumns = onDupKeyPairs.size(); List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns); - LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1); + LinkedHashSet<PColumn> updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1); updateColumns.add(new PColumnImpl( table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others - null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false)); + null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null)); for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) { ColumnName colName = columnPair.getFirst(); PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 39451b8..ed6c6cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter; + import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Collections; @@ -36,6 +38,7 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter; +import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.RowKeyComparisonFilter; import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter; @@ -46,17 +49,21 @@ import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.SubqueryParseNode; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -169,12 +176,14 @@ public class WhereCompiler { public Expression visit(ColumnParseNode node) throws SQLException { ColumnRef ref = resolveColumn(node); TableRef tableRef = ref.getTableRef(); + Expression newColumnExpression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) { + byte[] cq = tableRef.getTable().getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS + ? QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES : ref.getColumn().getColumnQualifierBytes(); // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs - context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName() - .getBytes()); + context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq); } - return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); + return newColumnExpression; } @Override @@ -195,7 +204,7 @@ public class WhereCompiler { // just use that. try { if (!SchemaUtil.isPKColumn(ref.getColumn())) { - table.getColumn(ref.getColumn().getName().getString()); + table.getColumnForColumnName(ref.getColumn().getName().getString()); } } catch (AmbiguousColumnException e) { disambiguateWithFamily = true; @@ -223,6 +232,7 @@ public class WhereCompiler { } } + public Count getCount() { return count; } @@ -258,6 +268,8 @@ public class WhereCompiler { return null; } }); + QualifierEncodingScheme encodingScheme = context.getCurrentTable().getTable().getEncodingScheme(); + ImmutableStorageScheme storageScheme = context.getCurrentTable().getTable().getImmutableStorageScheme(); switch (counter.getCount()) { case NONE: PTable table = context.getResolver().getTables().get(0).getTable(); @@ -270,7 +282,9 @@ public class WhereCompiler { filter = disambiguateWithFamily ? new SingleCFCQKeyValueComparisonFilter(whereClause) : new SingleCQKeyValueComparisonFilter(whereClause); break; case MULTIPLE: - filter = disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter(whereClause) : new MultiCQKeyValueComparisonFilter(whereClause); + filter = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) ? new MultiEncodedCQKeyValueComparisonFilter( + whereClause, encodingScheme) : (disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter( + whereClause) : new MultiCQKeyValueComparisonFilter(whereClause)); break; } scan.setFilter(filter); http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index ba9f7c8..01498ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; import java.util.List; +import java.util.ListIterator; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -47,11 +48,15 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -77,12 +82,19 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String DELETE_CQ = "_DeleteCQ"; public static final String DELETE_CF = "_DeleteCF"; public static final String EMPTY_CF = "_EmptyCF"; + public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier"; public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex"; public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; + /* + * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation. + * Needed for backward compatibility purposes. TODO: get rid of this in next major release. + */ + public static final String LOCAL_INDEX_BUILD_PROTO = "_LocalIndexBuild"; public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; + public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell"; public static final String VIEW_CONSTANTS = "_ViewConstants"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; @@ -101,6 +113,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final static String SCAN_OFFSET = "_RowOffset"; public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix"; public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix"; + public final static String MIN_QUALIFIER = "_MinQualifier"; + public final static String MAX_QUALIFIER = "_MaxQualifier"; + public final static String USE_NEW_VALUE_COLUMN_QUALIFIER = "_UseNewValueColumnQualifier"; + public final static String QUALIFIER_ENCODING_SCHEME = "_QualifierEncodingScheme"; + public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme"; + public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -111,6 +129,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; protected Configuration rawConf; + protected QualifierEncodingScheme encodingScheme; + protected boolean useNewValueColumnQualifier; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -183,6 +203,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); } + this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + this.useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); return s; } @@ -307,14 +329,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param indexMaintainer * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); } /** @@ -332,7 +354,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tx current transaction * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, @@ -340,7 +362,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Transaction tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) { return new RegionScanner() { private boolean hasReferences = checkForReferenceFiles(); @@ -437,11 +459,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier); result.clear(); result.add(tuple.getValue(0)); - if(arrayElementCell != null) + if (arrayElementCell != null) { result.add(arrayElementCell); + } } // There is a scanattribute set to retrieve the specific array element return next; @@ -475,7 +499,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier); result.clear(); result.add(tuple.getValue(0)); if(arrayElementCell != null) @@ -522,7 +547,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, List<Cell> result) { - // make a copy of the results array here, as we're modifying it below + // make a copy of the results array here, as we're modifying it below MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result)); // The size of both the arrays would be same? // Using KeyValueSchema to set and retrieve the value @@ -530,14 +555,15 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Cell rowKv = result.get(0); for (KeyValueColumnExpression kvExp : arrayKVRefs) { if (kvExp.evaluate(tuple, ptr)) { - for (int idx = tuple.size() - 1; idx >= 0; idx--) { - Cell kv = tuple.getValue(idx); + ListIterator<Cell> itr = result.listIterator(); + while (itr.hasNext()) { + Cell kv = itr.next(); if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length, kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()) - && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length, + && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length, kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) { // remove the kv that has the full array values. - result.remove(idx); + itr.remove(); break; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 8cb6dac..0843ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner { delegate.close(); } + @Override public long getMaxResultSize() { return delegate.getMaxResultSize(); } + @Override public boolean next(List<Cell> arg0, int arg1) throws IOException { return delegate.next(arg0, arg1); } + @Override public boolean next(List<Cell> arg0) throws IOException { return delegate.next(arg0); } + @Override public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { return delegate.nextRaw(arg0, arg1); } + @Override public boolean nextRaw(List<Cell> arg0) throws IOException { return delegate.nextRaw(arg0); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 49e3d71..75c3d43 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -62,9 +63,13 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; @@ -123,14 +128,20 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { RegionScanner innerScanner = s; - byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); - List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); + boolean useProto = false; + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); + useProto = localIndexBytes != null; + if (localIndexBytes == null) { + localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); + } + List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); TupleProjector tupleProjector = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -139,13 +150,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { innerScanner = new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), - c.getEnvironment()); + c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier); } long limit = Long.MAX_VALUE; @@ -377,7 +388,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { estDistVals = Math.max(MIN_DISTINCT_VALUES, (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } - + + Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -388,12 +401,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean success = false; try { boolean hasMore; - - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } - HRegion region = c.getEnvironment().getRegion(); boolean acquiredLock = false; try { @@ -401,7 +412,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there are @@ -436,7 +447,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } } } - + /** * Used for an aggregate query in which the key order match the group by key order. In this * case, we can do the aggregation as we scan, by detecting when the group by key changes. @@ -451,6 +462,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } + final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesPtr currentKey = null; @@ -460,7 +473,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean hasMore; boolean atLimit; boolean aggBoundary = false; - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); ImmutableBytesPtr key = null; Aggregator[] rowAggregators = aggregators.getAggregators(); // If we're calculating no aggregate functions, we can exit at the @@ -473,7 +486,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> kvs = new ArrayList<Cell>(); + List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there http://git-wip-us.apache.org/repos/asf/phoenix/blob/dc6a6fc7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 480ee6d..3044ab0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -42,6 +42,7 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; @@ -61,9 +62,11 @@ public class HashJoinRegionScanner implements RegionScanner { private List<Tuple>[] tempTuples; private ValueBitSet tempDestBitSet; private ValueBitSet[] tempSrcBitSet; + private final boolean useQualifierAsListIndex; + private final boolean useNewValueColumnQualifier; @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env) throws IOException { + public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException { this.env = env; this.scanner = scanner; this.projector = projector; @@ -104,17 +107,18 @@ public class HashJoinRegionScanner implements RegionScanner { this.tempDestBitSet = ValueBitSet.newInstance(joinInfo.getJoinedSchema()); this.projector.setValueBitSet(tempDestBitSet); } + this.useQualifierAsListIndex = useQualifierAsIndex; + this.useNewValueColumnQualifier = useNewValueColumnQualifier; } private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; - - Tuple tuple = new ResultTuple(Result.create(result)); + Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. if (joinInfo.forceProjection()) { - tuple = projector.projectResults(tuple); + tuple = projector.projectResults(tuple, useNewValueColumnQualifier); } if (hasBatchLimit) @@ -146,7 +150,7 @@ public class HashJoinRegionScanner implements RegionScanner { } else { KeyValueSchema schema = joinInfo.getJoinedSchema(); if (!joinInfo.forceProjection()) { // backward compatibility - tuple = projector.projectResults(tuple); + tuple = projector.projectResults(tuple, useNewValueColumnQualifier); } resultQueue.offer(tuple); for (int i = 0; i < count; i++) { @@ -174,7 +178,7 @@ public class HashJoinRegionScanner implements RegionScanner { lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, null, joinInfo.getSchemas()[i], tempSrcBitSet[i], - joinInfo.getFieldPositions()[i]); + joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); resultQueue.offer(joined); continue; } @@ -183,7 +187,7 @@ public class HashJoinRegionScanner implements RegionScanner { lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, t, joinInfo.getSchemas()[i], tempSrcBitSet[i], - joinInfo.getFieldPositions()[i]); + joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); resultQueue.offer(joined); } } @@ -314,7 +318,6 @@ public class HashJoinRegionScanner implements RegionScanner { processResults(result, limit >= 0); result.clear(); } - return nextInQueue(result); } catch (Throwable t) { ServerUtil.throwIOException(env.getRegion().getRegionNameAsString(), t);
