http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index bc3466c..e884439 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -39,6 +39,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; @@ -88,17 +89,18 @@ public class UnionCompiler { UNION_FAMILY_NAME, targetTypes.get(i).getType(), targetTypes.get(i).getMaxLength(), targetTypes.get(i).getScale(), colProj.getExpression().isNullable(), i, targetTypes.get(i).getSortOrder(), 500, null, false, - colProj.getExpression().toString(), false, false); + colProj.getExpression().toString(), false, false, null); projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); + // TODO: samarth this is likely just an in memory reference for compilation purposes. Probably ok to pass non-encoded scheme and null counter. PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, projectedColumns, null, null, null, true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L, SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY, - statement.getConnection().getQueryServices().getProps()), null, false); + statement.getConnection().getQueryServices().getProps()), null, false, StorageScheme.NON_ENCODED_COLUMN_NAMES, PTable.EncodedCQCounter.NULL_COUNTER); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 13963d7..63ad9c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.expression.AndExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -51,11 +53,13 @@ import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -168,12 +172,14 @@ public class WhereCompiler { public Expression visit(ColumnParseNode node) throws SQLException { ColumnRef ref = resolveColumn(node); TableRef tableRef = ref.getTableRef(); + ColumnExpression newColumnExpression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) { + byte[] cq = tableRef.getTable().getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL + ? ref.getColumn().getFamilyName().getBytes() : EncodedColumnsUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable()); // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs - context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName() - .getBytes()); + context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq); } - return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); + return newColumnExpression; } @Override @@ -194,7 +200,7 @@ public class WhereCompiler { // just use that. try { if (!SchemaUtil.isPKColumn(ref.getColumn())) { - table.getColumn(ref.getColumn().getName().getString()); + table.getPColumnForColumnName(ref.getColumn().getName().getString()); } } catch (AmbiguousColumnException e) { disambiguateWithFamily = true; @@ -222,6 +228,22 @@ public class WhereCompiler { } } + + public void increment(ArrayColumnExpression column) { + switch (count) { + case NONE: + count = Count.SINGLE; + this.column = column.getArrayExpression(); + break; + case SINGLE: + count = column.getArrayExpression().equals(this.column) ? Count.SINGLE : Count.MULTIPLE; + break; + case MULTIPLE: + break; + + } + } + public Count getCount() { return count; } @@ -256,6 +278,12 @@ public class WhereCompiler { counter.increment(expression); return null; } + + @Override + public Void visit(ArrayColumnExpression expression) { + counter.increment(expression); + return null; + } }); switch (counter.getCount()) { case NONE: http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 5320971..64e5efd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -50,6 +50,8 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; @@ -77,12 +79,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String DELETE_CQ = "_DeleteCQ"; public static final String DELETE_CF = "_DeleteCF"; public static final String EMPTY_CF = "_EmptyCF"; + public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier"; public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex"; public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; + public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell"; public static final String VIEW_CONSTANTS = "_ViewConstants"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; @@ -100,6 +104,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public final static String SCAN_OFFSET = "_RowOffset"; public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix"; public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix"; + public final static String MIN_QUALIFIER = "_MinQualifier"; + public final static String MAX_QUALIFIER = "_MaxQualifier"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -306,14 +312,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param indexMaintainer * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); } /** @@ -331,7 +337,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tx current transaction * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, @@ -339,7 +345,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Transaction tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) { return new RegionScanner() { private boolean hasReferences = checkForReferenceFiles(); @@ -436,11 +442,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + // TODO: samarth think if this is the right thing to do here. + Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); - if(arrayElementCell != null) + if (arrayElementCell != null) { result.add(arrayElementCell); + } } // There is a scanattribute set to retrieve the specific array element return next; @@ -474,7 +483,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); if(arrayElementCell != null) @@ -527,24 +537,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // Using KeyValueSchema to set and retrieve the value // collect the first kv to get the row Cell rowKv = result.get(0); - for (KeyValueColumnExpression kvExp : arrayKVRefs) { - if (kvExp.evaluate(tuple, ptr)) { - for (int idx = tuple.size() - 1; idx >= 0; idx--) { - Cell kv = tuple.getValue(idx); - if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length, - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()) - && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length, - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) { - // remove the kv that has the full array values. - result.remove(idx); - break; - } - } - } - } byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs, kvSchemaBitSet, ptr); // Add a dummy kv with the exact value of the array index + // TODO: samarth how does this dummy column qualifier play with encoded column names result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(), QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length, QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0, http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 8cb6dac..0843ba2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner { delegate.close(); } + @Override public long getMaxResultSize() { return delegate.getMaxResultSize(); } + @Override public boolean next(List<Cell> arg0, int arg1) throws IOException { return delegate.next(arg0, arg1); } + @Override public boolean next(List<Cell> arg0) throws IOException { return delegate.next(arg0); } + @Override public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { return delegate.nextRaw(arg0, arg1); } + @Override public boolean nextRaw(List<Cell> arg0) throws IOException { return delegate.nextRaw(arg0); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 49e3d71..d21508a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_ESTIMATED_DISTINCT_ import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_GROUPBY_SPILLABLE; +import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -62,7 +64,10 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.IndexUtil; @@ -131,6 +136,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null); if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -139,7 +145,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { @@ -155,9 +161,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } if (keyOrdered) { // Optimize by taking advantage that the rows are // already in the required group by key order - return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit); + return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, j != null); } else { // Otherwse, collect them all up in an in memory map - return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit); + return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, j != null); } } @@ -363,7 +369,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, long limit) throws IOException { + final ServerAggregators aggregators, long limit, boolean isJoin) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); @@ -377,7 +383,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { estDistVals = Math.max(MIN_DISTINCT_VALUES, (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } - + Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -388,12 +395,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean success = false; try { boolean hasMore; - - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } - HRegion region = c.getEnvironment().getRegion(); boolean acquiredLock = false; try { @@ -401,7 +406,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there are @@ -436,7 +441,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } } } - + /** * Used for an aggregate query in which the key order match the group by key order. In this * case, we can do the aggregation as we scan, by detecting when the group by key changes. @@ -445,12 +450,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit) throws IOException { + final ServerAggregators aggregators, final long limit, final boolean isJoin) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } + final Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); + final boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesPtr currentKey = null; @@ -460,7 +467,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean hasMore; boolean atLimit; boolean aggBoundary = false; - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); ImmutableBytesPtr key = null; Aggregator[] rowAggregators = aggregators.getAggregators(); // If we're calculating no aggregate functions, we can exit at the @@ -473,7 +480,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { acquiredLock = true; synchronized (scanner) { do { - List<Cell> kvs = new ArrayList<Cell>(); + List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there @@ -511,6 +518,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(), currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); + //TODO: samarth aaha how do we handle this. It looks like we are adding stuff like this to the results + // that we are returning. Bounded skip null cell list won't handle this properly. Interesting. So how do we + // handle this. Does having a reserved set of column qualifiers help here? results.add(keyValue); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate row: " http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 480ee6d..1b55c0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -109,7 +109,7 @@ public class HashJoinRegionScanner implements RegionScanner { private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; - + //TODO: samarth make joins work with position based lookup. Tuple tuple = new ResultTuple(Result.create(result)); // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 5ab42b9..a7247e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -27,6 +27,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES; @@ -34,6 +35,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES; @@ -57,6 +59,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX; @@ -190,8 +193,10 @@ import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.LinkType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; @@ -209,10 +214,12 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -282,6 +289,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES); private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES); private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES); + private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES); private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( EMPTY_KEYVALUE_KV, @@ -308,7 +316,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso UPDATE_CACHE_FREQUENCY_KV, IS_NAMESPACE_MAPPED_KV, AUTO_PARTITION_SEQ_KV, - APPEND_ONLY_SCHEMA_KV + APPEND_ONLY_SCHEMA_KV, + STORAGE_SCHEME_KV ); static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -338,6 +347,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV); private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV); private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV); + private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); @@ -351,6 +361,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES); private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES); private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES); + private static final KeyValue ENCODED_COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODED_COLUMN_QUALIFIER_BYTES); private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList( DECIMAL_DIGITS_KV, COLUMN_SIZE_KV, @@ -363,11 +374,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso VIEW_CONSTANT_KV, IS_VIEW_REFERENCED_KV, COLUMN_DEF_KV, - IS_ROW_TIMESTAMP_KV + IS_ROW_TIMESTAMP_KV, + ENCODED_COLUMN_QUALIFIER_KV ); static { Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR); } + private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES); private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV); private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV); private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV); @@ -379,9 +392,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV); private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV); private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV); + private static final int ENCODED_COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(ENCODED_COLUMN_QUALIFIER_KV); private static final int LINK_TYPE_INDEX = 0; - + private static final KeyValue CLASS_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, CLASS_NAME_BYTES); private static final KeyValue JAR_PATH_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, JAR_PATH_BYTES); private static final KeyValue RETURN_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, RETURN_TYPE_BYTES); @@ -717,8 +731,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(), isRowTimestampKV.getValueLength())); - - PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false); + Cell columnQualifierKV = colKeyValues[ENCODED_COLUMN_QUALIFIER_INDEX]; + Integer columnQualifier = columnQualifierKV == null ? null : + PInteger.INSTANCE.getCodec().decodeInt(columnQualifierKV.getValueArray(), columnQualifierKV.getValueOffset(), SortOrder.getDefault()); + PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier); columns.add(column); } @@ -926,37 +942,50 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(), isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength())); - - + Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX]; + //TODO: change this once we start having other values for storage schemes + StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme + .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), + storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength())); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = Lists.newArrayList(); List<PName> physicalTables = Lists.newArrayList(); PName parentTableName = tableType == INDEX ? dataTableName : null; PName parentSchemaName = tableType == INDEX ? schemaName : null; + EncodedCQCounter cqCounter = (storageScheme == StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter(); while (true) { - results.clear(); - scanner.next(results); - if (results.isEmpty()) { - break; - } - Cell colKv = results.get(LINK_TYPE_INDEX); - int colKeyLength = colKv.getRowLength(); - PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); - int colKeyOffset = offset + colName.getBytes().length + 1; - PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); - if (colName.getString().isEmpty() && famName != null) { - LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); - if (linkType == LinkType.INDEX_TABLE) { - addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); - } else if (linkType == LinkType.PHYSICAL_TABLE) { - physicalTables.add(famName); - } else if (linkType == LinkType.PARENT_TABLE) { - parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); - parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); - } - } else { - addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); - } + results.clear(); + scanner.next(results); + if (results.isEmpty()) { + break; + } + //TODO: samarth remember why we have colums present at LINK_TYPE_INDEX + Cell colKv = results.get(LINK_TYPE_INDEX); + if (colKv != null) { + int colKeyLength = colKv.getRowLength(); + PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset); + int colKeyOffset = offset + colName.getBytes().length + 1; + PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); + if (isQualifierCounterKV(colKv)) { + Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC); + cqCounter.setValue(famName.getString(), value); + } else { + if (colName.getString().isEmpty() && famName != null) { + LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); + if (linkType == LinkType.INDEX_TABLE) { + addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); + } else if (linkType == LinkType.PHYSICAL_TABLE) { + physicalTables.add(famName); + } else if (linkType == LinkType.PARENT_TABLE) { + parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes())); + parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes())); + } + } else { + addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); + } + } + } } // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote // server while holding this lock is a bad idea and likely to cause contention. @@ -964,9 +993,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount, - indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema); + indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, cqCounter); } - + + private boolean isQualifierCounterKV(Cell kv) { + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(), + QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength()); + return cmp == 0; + } + private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException { List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -1991,7 +2028,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return result; } region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); - // Invalidate from cache + // Invalidate from cache. //TODO: samarth should we invalidate the base table from the cache too here. for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); } @@ -2164,6 +2201,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[][] rkmd = new byte[5][]; int pkCount = getVarChars(m.getRow(), rkmd); if (pkCount > COLUMN_NAME_INDEX + && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0 && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0 && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) { columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES))); @@ -2198,8 +2236,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]); String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]); try { - existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily( - columnFamily).getColumn(columnName); + existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily( + columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to be added. } catch (ColumnNotFoundException e) { @@ -2326,26 +2364,26 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso columnsAddedToBaseTable++; } } - /* - * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base - * table pk columns 2. if we are adding all the existing view pk columns to the base table - */ - if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) { - return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); - } - addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view, - deltaNumPkColsSoFar); - - /* - * Increment the sequence number by 1 if: - * 1) For a diverged view, there were columns (pk columns) added to the view. - * 2) For a non-diverged view if the base column count changed. - */ - boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0) - || (!isDivergedView(view) && columnsAddedToBaseTable > 0); - updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, - invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable, - viewKey, view, ordinalPositionList, numCols, changeSequenceNumber); + /* + * Allow adding a pk columns to base table : 1. if all the view pk columns are exactly the same as the base + * table pk columns 2. if we are adding all the existing view pk columns to the base table + */ + if (addingExistingPkCol && !viewPkCols.equals(basePhysicalTable.getPKColumns())) { + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } + addViewIndexesHeaderRowMutations(mutationsForAddingColumnsToViews, invalidateList, clientTimeStamp, view, + deltaNumPkColsSoFar); + + /* + * Increment the sequence number by 1 if: + * 1) For a diverged view, there were columns (pk columns) added to the view. + * 2) For a non-diverged view if the base column count changed. + */ + boolean changeSequenceNumber = (isDivergedView(view) && columnsAddedToView > 0) + || (!isDivergedView(view) && columnsAddedToBaseTable > 0); + updateViewHeaderRow(basePhysicalTable, tableMetadata, mutationsForAddingColumnsToViews, + invalidateList, clientTimeStamp, columnsAddedToView, columnsAddedToBaseTable, + viewKey, view, ordinalPositionList, numCols, changeSequenceNumber); } return null; } @@ -2503,8 +2541,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily); try { existingViewColumn = - columnFamily == null ? view.getColumn(columnName) : view - .getColumnFamily(columnFamily).getColumn(columnName); + columnFamily == null ? view.getPColumnForColumnName(columnName) : view + .getColumnFamily(columnFamily).getPColumnForColumnName(columnName); } catch (ColumnFamilyNotFoundException e) { // ignore since it means that the column family is not present for the column to // be added. @@ -2570,7 +2608,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) { if (existingViewColumn != null) { - + if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) { + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable); + } // Validate data type is same int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES); if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) { @@ -2813,7 +2853,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) { PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { addingPKColumn = true; @@ -3066,7 +3106,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PColumnFamily family = table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]); columnToDelete = - family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); + family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]); } else if (pkCount > COLUMN_NAME_INDEX && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) { deletePKColumn = true; @@ -3155,10 +3195,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] indexKey = SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index .getTableName().getBytes()); + Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString()); + boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo); + boolean isCoveredColumn = indexMaintainer.getCoveredColumnInfo().contains(columnToDeleteInfo); // If index requires this column for its pk, then drop it - if (indexMaintainer.getIndexedColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + if (isColumnIndexed) { // Since we're dropping the index, lock it to ensure // that a change in index state doesn't // occur while we're dropping it. @@ -3179,9 +3220,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso invalidateList.add(new ImmutableBytesPtr(indexKey)); } // If the dropped column is a covered index column, invalidate the index - else if (indexMaintainer.getCoveredColumns().contains( - new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete - .getName().getBytes()))) { + else if (isCoveredColumn){ invalidateList.add(new ImmutableBytesPtr(indexKey)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 3cfe790..5cfb102 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -107,7 +107,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } } - public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { + private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s, boolean isJoin) { byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN); if (topN == null) { return null; @@ -125,7 +125,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { orderByExpression.readFields(input); orderByExpressions.add(orderByExpression); } - ResultIterator inner = new RegionScannerResultIterator(s); + ResultIterator inner = new RegionScannerResultIterator(s, ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin); return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); } catch (IOException e) { @@ -218,10 +218,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + //TODO: samarth get rid of this join shit. Joins should support position based look up. + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, - viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex); final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { @@ -229,10 +231,10 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } if (scanOffset != null) { innerScanner = getOffsetScanner(c, innerScanner, - new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset), + new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, ScanUtil.getMinMaxQualifiersFromScan(scan), j != null), scanOffset), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } - final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); + final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner, j != null); if (iterator == null) { return innerScanner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 2e2d580..89ccff0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -80,6 +80,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE"; private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS)); + //TODO: samarth verify that it is ok to send non-encoded empty column here. Probably is. private static Result getErrorResult(byte[] row, long timestamp, int errorCode) { byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()]; PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0); http://git-wip-us.apache.org/repos/asf/phoenix/blob/23a87989/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a312020..3129ef8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.util.ScanUtil.getMinMaxQualifiersFromScan; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -49,10 +50,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; @@ -98,7 +96,10 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; @@ -114,6 +115,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TimeKeeper; +import org.apache.tephra.TxConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,8 +123,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.tephra.TxConstants; - /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). @@ -300,6 +300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCQ = null; byte[] deleteCF = null; byte[] emptyCF = null; + byte[] emptyKVQualifier = null; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; @@ -315,12 +316,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ); } emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); + emptyKVQualifier = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER);//TODO: samarth check this } TupleProjector tupleProjector = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -329,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, - c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { @@ -369,7 +372,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Aggregator[] rowAggregators = aggregators.getAggregators(); boolean hasMore; boolean hasAny = false; - MultiKeyValueTuple result = new MultiKeyValueTuple(); + Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } @@ -386,7 +390,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver acquiredLock = true; synchronized (innerScanner) { do { - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned @@ -494,7 +498,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, results.get(0).getTimestamp(), env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); + env.getRegion().getRegionInfo().getEndKey(), false); indexMutations.add(put); } } @@ -589,8 +593,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (!timeStamps.contains(kvts)) { Put put = new Put(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, - ByteUtil.EMPTY_BYTE_ARRAY); + // FIXME: Use the right byte array value. Transactional tables can't + // have empty byte arrays since Tephra seems them as delete markers. + put.add(emptyCF, emptyKVQualifier != null ? emptyKVQualifier + : QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY); mutations.add(put); } }