http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index c4eb4f7,33218ee..6f2f72d --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@@ -87,7 -86,11 +88,12 @@@ abstract public class BaseScannerRegion public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; + public static final String TX_STATE = "_TxState"; + public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES"; + public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION"; + public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY"; + public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER"; + /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198. @@@ -225,9 -228,9 +231,9 @@@ final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); } - + /** * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query @@@ -247,10 -249,10 +253,11 @@@ final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, - final HRegion dataRegion, final IndexMaintainer indexMaintainer, - Transaction tx, final byte[][] viewConstants, - final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, - final TupleProjector projector, final ImmutableBytesWritable ptr) { + final Region dataRegion, final IndexMaintainer indexMaintainer, ++ Transaction tx, + final byte[][] viewConstants, final KeyValueSchema kvSchema, + final ValueBitSet kvSchemaBitSet, final TupleProjector projector, + final ImmutableBytesWritable ptr) { return new RegionScanner() { @Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index ee724fc,7eb1dc6..ecb1548 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@@ -47,7 -60,7 +60,8 @@@ import static org.apache.phoenix.jdbc.P import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES; + import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES; @@@ -179,26 -215,32 +216,33 @@@ import com.google.protobuf.Service public class MetaDataEndpointImpl extends MetaDataProtocol implements CoprocessorService, Coprocessor { private static final Logger logger = LoggerFactory.getLogger(MetaDataEndpointImpl.class); + // Column to track tables that have been upgraded based on PHOENIX-2067 + public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE"; + public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE); + // KeyValues for Table - private static final KeyValue TABLE_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES); - private static final KeyValue TABLE_SEQ_NUM_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); - private static final KeyValue COLUMN_COUNT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES); - private static final KeyValue SALT_BUCKETS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES); - private static final KeyValue PK_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES); - private static final KeyValue DATA_TABLE_NAME_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); - private static final KeyValue INDEX_STATE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); - private static final KeyValue IMMUTABLE_ROWS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES); - private static final KeyValue VIEW_EXPRESSION_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES); - private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES); - private static final KeyValue DISABLE_WAL_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES); - private static final KeyValue MULTI_TENANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES); - private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES); - private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES); - private static final KeyValue INDEX_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES); - private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); - private static final KeyValue STORE_NULLS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES); - private static final KeyValue TRANSACTIONAL_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES); - private static final KeyValue EMPTY_KEYVALUE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); + private static final KeyValue TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES); + private static final KeyValue TABLE_SEQ_NUM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); + private static final KeyValue COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_COUNT_BYTES); + private static final KeyValue SALT_BUCKETS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SALT_BUCKETS_BYTES); + private static final KeyValue PK_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PK_NAME_BYTES); + private static final KeyValue DATA_TABLE_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES); + private static final KeyValue INDEX_STATE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_STATE_BYTES); + private static final KeyValue IMMUTABLE_ROWS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IMMUTABLE_ROWS_BYTES); + private static final KeyValue VIEW_EXPRESSION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_STATEMENT_BYTES); + private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DEFAULT_COLUMN_FAMILY_NAME_BYTES); + private static final KeyValue DISABLE_WAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DISABLE_WAL_BYTES); + private static final KeyValue MULTI_TENANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES); + private static final KeyValue VIEW_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES); + private static final KeyValue VIEW_INDEX_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES); + private static final KeyValue INDEX_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES); + private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES); + private static final KeyValue STORE_NULLS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORE_NULLS_BYTES); + private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); + private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES); + private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES); ++ private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES); + private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( EMPTY_KEYVALUE_KV, TABLE_TYPE_KV, @@@ -217,8 -259,9 +261,10 @@@ VIEW_INDEX_ID_KV, INDEX_TYPE_KV, INDEX_DISABLE_TIMESTAMP_KV, - TRANSACTIONAL_KV, - STORE_NULLS_KV + STORE_NULLS_KV, + BASE_COLUMN_COUNT_KV, - ROW_KEY_ORDER_OPTIMIZABLE_KV ++ ROW_KEY_ORDER_OPTIMIZABLE_KV, ++ TRANSACTIONAL_KV ); static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); @@@ -239,19 -283,21 +286,22 @@@ private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV); private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV); private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV); + private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV); + private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV); + private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV); // KeyValues for Column - private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); - private static final KeyValue COLUMN_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES); - private static final KeyValue NULLABLE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NULLABLE_BYTES); - private static final KeyValue DATA_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES); - private static final KeyValue ORDINAL_POSITION_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES); - private static final KeyValue SORT_ORDER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES); - private static final KeyValue ARRAY_SIZE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES); - private static final KeyValue VIEW_CONSTANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES); - private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES); - private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES); + private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); + private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES); + private static final KeyValue NULLABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, NULLABLE_BYTES); + private static final KeyValue DATA_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES); + private static final KeyValue ORDINAL_POSITION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES); + private static final KeyValue SORT_ORDER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, SORT_ORDER_BYTES); + private static final KeyValue ARRAY_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ARRAY_SIZE_BYTES); + private static final KeyValue VIEW_CONSTANT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_CONSTANT_BYTES); + private static final KeyValue IS_VIEW_REFERENCED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES); + private static final KeyValue COLUMN_DEF_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES); + private static final KeyValue IS_ROW_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES); private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList( DECIMAL_DIGITS_KV, COLUMN_SIZE_KV, @@@ -649,10 -844,125 +850,125 @@@ return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, - disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, transactional, stats); - disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable); ++ disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable, transactional); } - private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, + private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace) + throws IOException, SQLException { + List<Cell> results = Lists.newArrayList(); + scanner.next(results); + if (results.isEmpty()) { + return null; + } + Cell[] functionKeyValues = new Cell[FUNCTION_KV_COLUMNS.size()]; + Cell[] functionArgKeyValues = new Cell[FUNCTION_ARG_KV_COLUMNS.size()]; + // Create PFunction based on KeyValues from scan + Cell keyValue = results.get(0); + byte[] keyBuffer = keyValue.getRowArray(); + int keyLength = keyValue.getRowLength(); + int keyOffset = keyValue.getRowOffset(); + long currentTimeMillis = EnvironmentEdgeManager.currentTimeMillis(); + if(isReplace) { + long deleteTimeStamp = + clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1 + : (keyValue.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1 + : keyValue.getTimestamp()); + deleteMutationsForReplace.add(new Delete(keyBuffer, keyOffset, keyLength, deleteTimeStamp)); + } + PName tenantId = newPName(keyBuffer, keyOffset, keyLength); + int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length; + if (tenantIdLength == 0) { + tenantId = null; + } + PName functionName = + newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1); + int functionNameLength = functionName.getBytes().length+1; + int offset = tenantIdLength + functionNameLength + 1; + + long timeStamp = keyValue.getTimestamp(); + + int i = 0; + int j = 0; + while (i < results.size() && j < FUNCTION_KV_COLUMNS.size()) { + Cell kv = results.get(i); + Cell searchKv = FUNCTION_KV_COLUMNS.get(j); + int cmp = + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), + kv.getQualifierLength(), searchKv.getQualifierArray(), + searchKv.getQualifierOffset(), searchKv.getQualifierLength()); + if (cmp == 0) { + timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find max timestamp of table + // header row + functionKeyValues[j++] = kv; + i++; + } else if (cmp > 0) { + timeStamp = Math.max(timeStamp, kv.getTimestamp()); + functionKeyValues[j++] = null; + } else { + i++; // shouldn't happen - means unexpected KV in system table header row + } + } + // CLASS_NAME,NUM_ARGS and JAR_PATH are required. + if (functionKeyValues[CLASS_NAME_INDEX] == null || functionKeyValues[NUM_ARGS_INDEX] == null) { + throw new IllegalStateException( + "Didn't find expected key values for function row in metadata row"); + } + + Cell classNameKv = functionKeyValues[CLASS_NAME_INDEX]; + PName className = newPName(classNameKv.getValueArray(), classNameKv.getValueOffset(), + classNameKv.getValueLength()); + Cell jarPathKv = functionKeyValues[JAR_PATH_INDEX]; + PName jarPath = null; + if(jarPathKv != null) { + jarPath = newPName(jarPathKv.getValueArray(), jarPathKv.getValueOffset(), + jarPathKv.getValueLength()); + } + Cell numArgsKv = functionKeyValues[NUM_ARGS_INDEX]; + int numArgs = + PInteger.INSTANCE.getCodec().decodeInt(numArgsKv.getValueArray(), + numArgsKv.getValueOffset(), SortOrder.getDefault()); + Cell returnTypeKv = functionKeyValues[RETURN_TYPE_INDEX]; + PName returnType = + returnTypeKv == null ? null : newPName(returnTypeKv.getValueArray(), + returnTypeKv.getValueOffset(), returnTypeKv.getValueLength()); + + List<FunctionArgument> arguments = Lists.newArrayListWithExpectedSize(numArgs); + for (int k = 0; k < numArgs; k++) { + results.clear(); + scanner.next(results); + if (results.isEmpty()) { + break; + } + Cell typeKv = results.get(0); + if(isReplace) { + long deleteTimeStamp = + clientTimeStamp == HConstants.LATEST_TIMESTAMP ? currentTimeMillis - 1 + : (typeKv.getTimestamp() < clientTimeStamp ? clientTimeStamp - 1 + : typeKv.getTimestamp()); + deleteMutationsForReplace.add(new Delete(typeKv.getRowArray(), typeKv + .getRowOffset(), typeKv.getRowLength(), deleteTimeStamp)); + } + int typeKeyLength = typeKv.getRowLength(); + PName typeName = + newPName(typeKv.getRowArray(), typeKv.getRowOffset() + offset, typeKeyLength + - offset - 3); + + int argPositionOffset = offset + typeName.getBytes().length + 1; + short argPosition = Bytes.toShort(typeKv.getRowArray(), typeKv.getRowOffset() + argPositionOffset, typeKeyLength + - argPositionOffset); + addArgumentToFunction(results, functionName, typeName, functionArgKeyValues, arguments, argPosition); + } + Collections.sort(arguments, new Comparator<FunctionArgument>() { + @Override + public int compare(FunctionArgument o1, FunctionArgument o2) { + return o1.getArgPosition() - o2.getArgPosition(); + } + }); + return new PFunction(tenantId, functionName.getString(), arguments, returnType.getString(), + className.getString(), jarPath == null ? null : jarPath.getString(), timeStamp); + } + + private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, Region region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; @@@ -768,13 -1217,12 +1223,12 @@@ } } // Load child table next - ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); + ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey); // Get as of latest timestamp so we can detect if we have a newer table that already - // exists - // without making an additional query + // exists without making an additional query PTable table = - loadTable(env, key, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP); + loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP); - if (table != null) { + if (table != null && !isTableDeleted(table)) { if (table.getTimeStamp() < clientTimeStamp) { // If the table is older than the client time stamp and it's deleted, // continue @@@ -793,17 -1241,19 +1247,18 @@@ return; } } + // Add cell for ROW_KEY_ORDER_OPTIMIZABLE = true, as we know that new tables + // conform the correct row key. The exception is for a VIEW, which the client + // sends over depending on its base physical table. + if (tableType != PTableType.VIEW) { + UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetadata, tableKey, clientTimeStamp); + } // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the - // system - // table. Basically, we get all the locks that we don't already hold for all the - // tableMetadata rows. This ensures we don't have deadlock situations (ensuring - // primary and - // then index table locks are held, in that order). For now, we just don't support - // indexing - // on the system table. This is an issue because of the way we manage batch mutation - // in the - // Indexer. - region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet()); + // system table. Basically, we get all the locks that we don't already hold for all the - // tableMetadata rows. This ensures we don't have deadlock situations (ensuring + // primary and then index table locks are held, in that order). For now, we just don't support + // indexing on the system table. This is an issue because of the way we manage batch mutation + // in the Indexer. + region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index e4fd584,54c688a..65a43de --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@@ -190,10 -188,9 +191,10 @@@ public class ScanRegionObserver extend Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan( scan, innerScanner, arrayKVRefs); TupleProjector tupleProjector = null; - HRegion dataRegion = null; + Region dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; + Transaction tx = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@@ -202,10 -199,8 +203,10 @@@ List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); + byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + tx = MutationState.decodeTransaction(txState); } - + final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); innerScanner = http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java index 02b1fa3,25f8271..7e71cd9 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java @@@ -3109,15 -3199,25 +3199,35 @@@ public final class PTableProtos */ boolean getStoreNulls(); - // optional bool transactional = 25; + // optional int32 baseColumnCount = 25; /** - * <code>optional bool transactional = 25;</code> + * <code>optional int32 baseColumnCount = 25;</code> + */ + boolean hasBaseColumnCount(); + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + int getBaseColumnCount(); + + // optional bool rowKeyOrderOptimizable = 26; + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + boolean hasRowKeyOrderOptimizable(); + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + boolean getRowKeyOrderOptimizable(); ++ ++ // optional bool transactional = 27; ++ /** ++ * <code>optional bool transactional = 27;</code> + */ + boolean hasTransactional(); + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + boolean getTransactional(); } /** * Protobuf type {@code PTable} @@@ -3310,9 -3410,14 +3420,19 @@@ } case 200: { bitField0_ |= 0x00100000; + baseColumnCount_ = input.readInt32(); + break; + } + case 208: { + bitField0_ |= 0x00200000; + rowKeyOrderOptimizable_ = input.readBool(); + break; + } ++ case 216: { ++ bitField0_ |= 0x00400000; + transactional_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@@ -3843,22 -3948,38 +3963,54 @@@ return storeNulls_; } - // optional bool transactional = 25; - public static final int TRANSACTIONAL_FIELD_NUMBER = 25; + // optional int32 baseColumnCount = 25; + public static final int BASECOLUMNCOUNT_FIELD_NUMBER = 25; + private int baseColumnCount_; + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public boolean hasBaseColumnCount() { + return ((bitField0_ & 0x00100000) == 0x00100000); + } + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public int getBaseColumnCount() { + return baseColumnCount_; + } + + // optional bool rowKeyOrderOptimizable = 26; + public static final int ROWKEYORDEROPTIMIZABLE_FIELD_NUMBER = 26; + private boolean rowKeyOrderOptimizable_; + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public boolean hasRowKeyOrderOptimizable() { + return ((bitField0_ & 0x00200000) == 0x00200000); + } + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public boolean getRowKeyOrderOptimizable() { + return rowKeyOrderOptimizable_; + } + ++ // optional bool transactional = 27; ++ public static final int TRANSACTIONAL_FIELD_NUMBER = 27; + private boolean transactional_; + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public boolean hasTransactional() { - return ((bitField0_ & 0x00100000) == 0x00100000); ++ return ((bitField0_ & 0x00400000) == 0x00400000); + } + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public boolean getTransactional() { + return transactional_; + } + private void initFields() { schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY; tableNameBytes_ = com.google.protobuf.ByteString.EMPTY; @@@ -3884,7 -4005,8 +4036,9 @@@ indexType_ = com.google.protobuf.ByteString.EMPTY; statsTimeStamp_ = 0L; storeNulls_ = false; + baseColumnCount_ = 0; + rowKeyOrderOptimizable_ = false; + transactional_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@@ -4025,8 -4147,11 +4179,14 @@@ output.writeBool(24, storeNulls_); } if (((bitField0_ & 0x00100000) == 0x00100000)) { - output.writeBool(25, transactional_); + output.writeInt32(25, baseColumnCount_); + } + if (((bitField0_ & 0x00200000) == 0x00200000)) { + output.writeBool(26, rowKeyOrderOptimizable_); + } ++ if (((bitField0_ & 0x00400000) == 0x00400000)) { ++ output.writeBool(27, transactional_); + } getUnknownFields().writeTo(output); } @@@ -4139,8 -4264,12 +4299,16 @@@ } if (((bitField0_ & 0x00100000) == 0x00100000)) { size += com.google.protobuf.CodedOutputStream - .computeBoolSize(25, transactional_); + .computeInt32Size(25, baseColumnCount_); + } + if (((bitField0_ & 0x00200000) == 0x00200000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(26, rowKeyOrderOptimizable_); + } ++ if (((bitField0_ & 0x00400000) == 0x00400000)) { ++ size += com.google.protobuf.CodedOutputStream ++ .computeBoolSize(27, transactional_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@@ -4272,11 -4401,16 +4440,21 @@@ result = result && (getStoreNulls() == other.getStoreNulls()); } + result = result && (hasBaseColumnCount() == other.hasBaseColumnCount()); + if (hasBaseColumnCount()) { + result = result && (getBaseColumnCount() + == other.getBaseColumnCount()); + } + result = result && (hasRowKeyOrderOptimizable() == other.hasRowKeyOrderOptimizable()); + if (hasRowKeyOrderOptimizable()) { + result = result && (getRowKeyOrderOptimizable() + == other.getRowKeyOrderOptimizable()); + } + result = result && (hasTransactional() == other.hasTransactional()); + if (hasTransactional()) { + result = result && (getTransactional() + == other.getTransactional()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@@ -4386,10 -4520,14 +4564,18 @@@ hash = (37 * hash) + STORENULLS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getStoreNulls()); } + if (hasBaseColumnCount()) { + hash = (37 * hash) + BASECOLUMNCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getBaseColumnCount(); + } + if (hasRowKeyOrderOptimizable()) { + hash = (37 * hash) + ROWKEYORDEROPTIMIZABLE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getRowKeyOrderOptimizable()); + } + if (hasTransactional()) { + hash = (37 * hash) + TRANSACTIONAL_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getTransactional()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@@ -4562,8 -4700,10 +4748,12 @@@ bitField0_ = (bitField0_ & ~0x00400000); storeNulls_ = false; bitField0_ = (bitField0_ & ~0x00800000); - transactional_ = false; + baseColumnCount_ = 0; bitField0_ = (bitField0_ & ~0x01000000); + rowKeyOrderOptimizable_ = false; + bitField0_ = (bitField0_ & ~0x02000000); ++ transactional_ = false; ++ bitField0_ = (bitField0_ & ~0x04000000); return this; } @@@ -4707,7 -4847,11 +4897,15 @@@ if (((from_bitField0_ & 0x01000000) == 0x01000000)) { to_bitField0_ |= 0x00100000; } + result.baseColumnCount_ = baseColumnCount_; + if (((from_bitField0_ & 0x02000000) == 0x02000000)) { + to_bitField0_ |= 0x00200000; + } + result.rowKeyOrderOptimizable_ = rowKeyOrderOptimizable_; ++ if (((from_bitField0_ & 0x04000000) == 0x04000000)) { ++ to_bitField0_ |= 0x00400000; ++ } + result.transactional_ = transactional_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@@ -4874,9 -5018,12 +5072,15 @@@ if (other.hasStoreNulls()) { setStoreNulls(other.getStoreNulls()); } + if (other.hasBaseColumnCount()) { + setBaseColumnCount(other.getBaseColumnCount()); + } + if (other.hasRowKeyOrderOptimizable()) { + setRowKeyOrderOptimizable(other.getRowKeyOrderOptimizable()); + } + if (other.hasTransactional()) { + setTransactional(other.getTransactional()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@@ -6481,39 -6628,72 +6685,105 @@@ return this; } - // optional bool transactional = 25; + // optional int32 baseColumnCount = 25; + private int baseColumnCount_ ; + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public boolean hasBaseColumnCount() { + return ((bitField0_ & 0x01000000) == 0x01000000); + } + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public int getBaseColumnCount() { + return baseColumnCount_; + } + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public Builder setBaseColumnCount(int value) { + bitField0_ |= 0x01000000; + baseColumnCount_ = value; + onChanged(); + return this; + } + /** + * <code>optional int32 baseColumnCount = 25;</code> + */ + public Builder clearBaseColumnCount() { + bitField0_ = (bitField0_ & ~0x01000000); + baseColumnCount_ = 0; + onChanged(); + return this; + } + + // optional bool rowKeyOrderOptimizable = 26; + private boolean rowKeyOrderOptimizable_ ; + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public boolean hasRowKeyOrderOptimizable() { + return ((bitField0_ & 0x02000000) == 0x02000000); + } + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public boolean getRowKeyOrderOptimizable() { + return rowKeyOrderOptimizable_; + } + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public Builder setRowKeyOrderOptimizable(boolean value) { + bitField0_ |= 0x02000000; + rowKeyOrderOptimizable_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool rowKeyOrderOptimizable = 26;</code> + */ + public Builder clearRowKeyOrderOptimizable() { + bitField0_ = (bitField0_ & ~0x02000000); + rowKeyOrderOptimizable_ = false; + onChanged(); + return this; + } + ++ // optional bool transactional = 27; + private boolean transactional_ ; + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public boolean hasTransactional() { - return ((bitField0_ & 0x01000000) == 0x01000000); ++ return ((bitField0_ & 0x04000000) == 0x04000000); + } + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public boolean getTransactional() { + return transactional_; + } + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public Builder setTransactional(boolean value) { - bitField0_ |= 0x01000000; ++ bitField0_ |= 0x04000000; + transactional_ = value; + onChanged(); + return this; + } + /** - * <code>optional bool transactional = 25;</code> ++ * <code>optional bool transactional = 27;</code> + */ + public Builder clearTransactional() { - bitField0_ = (bitField0_ & ~0x01000000); ++ bitField0_ = (bitField0_ & ~0x04000000); + transactional_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PTable) } @@@ -6556,29 -6736,30 +6826,31 @@@ "\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" + "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " + "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" + - "n\030\014 \001(\t\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006" + - "values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(" + - "\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCo", - "unt\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuideP" + - "osts\"\315\004\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014" + - "\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 " + - "\002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016" + - "sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022" + - "\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022" + - "\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes\030\n " + - "\003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010\022 \n" + - "\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022dataT" + - "ableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyName", - "\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTenan" + - "t\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStateme" + - "nt\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010tenan" + - "tId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tindexT" + - "ype\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\nsto" + - "reNulls\030\030 \001(\010\022\025\n\rtransactional\030\031 \001(\010*A\n\n" + - "PTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW" + - "\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.ph" + - "oenix.coprocessor.generatedB\014PTableProto" + - "sH\001\210\001\001\240\001\001" + "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\"\232\001\n\013PTab" + + "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" + + "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou", + "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" + - "idePosts\030\006 \001(\0132\014.PGuidePosts\"\357\004\n\006PTable\022" + ++ "idePosts\030\006 \001(\0132\014.PGuidePosts\"\206\005\n\006PTable\022" + + "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" + + "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" + + "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" + + " \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007" + + " \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\013" + + "2\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017" + + "isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(" + + "\0132\014.PTableStats\022\032\n\022dataTableNameBytes\030\r ", + "\001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisabl" + + "eWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewT" + + "ype\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphys" + + "icalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013vie" + + "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" + + "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" + + "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" + - "imizable\030\032 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000" + - "\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020" + - "\004B@\n(org.apache.phoenix.coprocessor.gene", - "ratedB\014PTableProtosH\001\210\001\001\240\001\001" ++ "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(\010*A\n" + ++ "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" + ++ "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p", ++ "hoenix.coprocessor.generatedB\014PTableProt" + ++ "osH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@@ -6602,7 -6783,7 +6874,7 @@@ internal_static_PTable_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PTable_descriptor, - new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "Transactional", }); - new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", }); ++ new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", }); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index db50f83,53a13be..50d7617 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@@ -154,20 -156,29 +156,35 @@@ public enum SQLExceptionCode /** * Expression Index exceptions. */ - AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggreagaate expression not allowed in an index"), + AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index"), NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index"), STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index"), + + /** + * Transaction exceptions. + */ + TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to conflict with other mutations"), + TRANSACTION_EXCEPTION(524, "42901", "Transaction aborted due to error"), - /** + /** + * Union All related errors + */ + SELECT_COLUMN_NUM_IN_UNIONALL_DIFFS(525, "42902", "SELECT column number differs in a Union All query is not allowed"), + SELECT_COLUMN_TYPE_IN_UNIONALL_DIFFS(526, "42903", "SELECT column types differ in a Union All query is not allowed"), + + /** + * Row timestamp column related errors + */ + ROWTIMESTAMP_ONE_PK_COL_ONLY(527, "42904", "Only one column that is part of the primary key can be declared as a ROW_TIMESTAMP"), + ROWTIMESTAMP_PK_COL_ONLY(528, "42905", "Only columns part of the primary key can be declared as a ROW_TIMESTAMP"), + ROWTIMESTAMP_CREATE_ONLY(529, "42906", "A column can be added as ROW_TIMESTAMP only in CREATE TABLE"), + ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP"), + ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views"), + INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero"), + /** * HBase and Phoenix specific implementation defined sub-classes. * Column family related exceptions. - * + * * For the following exceptions, use errorcode 10. */ SINGLE_PK_MAY_NOT_BE_NULL(1000, "42I00", "Single column primary key may not be NULL."), @@@ -242,16 -253,11 +259,18 @@@ CANNOT_ALTER_PROPERTY(1051, "43A08", "Property can be specified or changed only when creating a table"), CANNOT_SET_PROPERTY_FOR_COLUMN_NOT_ADDED(1052, "43A09", "Property cannot be specified for a column family that is not being added or modified"), CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN(1053, "43A10", "Table level property cannot be set when adding a column"), - - NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported for HBase versions " + + + NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported for HBase versions " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW) + " through " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW) + " inclusive."), UNALLOWED_LOCAL_INDEXES(1055, "43A12", "Local secondary indexes are configured to not be allowed."), + - DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1056, "43A13", "Default column family not allowed on VIEW or shared INDEX"), + DESC_VARBINARY_NOT_SUPPORTED(1056, "43A13", "Descending VARBINARY columns not supported"), ++ ++ DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1057, "43A13", "Default column family not allowed on VIEW or shared INDEX"), + ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"), + MAY_NOT_MAP_TO_EXISTING_TABLE_AS_TRANSACTIONAL(1071, "44A02", "An existing HBase table may not be mapped to as a transactional table"), + STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"), + CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 91098d3,e873df7..bc6dd23 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@@ -40,14 -42,17 +42,18 @@@ import org.apache.phoenix.compile.Query import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; + import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; + import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.index.IndexMaintainer; + import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.DelegateResultIterator; import org.apache.phoenix.iterate.ParallelIteratorFactory; + import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; @@@ -67,10 -72,7 +73,9 @@@ import org.apache.phoenix.util.LogUtil import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.TransactionUtil; - import org.cloudera.htrace.TraceScope; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@@ -111,14 -118,9 +123,15 @@@ public abstract class BaseQueryPlan imp this.orderBy = orderBy; this.groupBy = groupBy; this.parallelIteratorFactory = parallelIteratorFactory; + this.dynamicFilter = dynamicFilter; } + + @Override + public Operation getOperation() { + return Operation.QUERY; + } + @Override public boolean isDegenerate() { return context.getScanRanges() == ScanRanges.NOTHING; @@@ -187,27 -202,41 +218,43 @@@ scan.setSmall(true); } - // Set producer on scan so HBase server does round robin processing - //setProducer(scan); - // Set the time range on the scan so we don't get back rows newer than when the statement was compiled - // The time stamp comes from the server at compile time when the meta data - // is resolved. - // TODO: include time range in explain plan? - PTable table = context.getCurrentTable().getTable(); PhoenixConnection connection = context.getConnection(); - // Timestamp is managed by Transaction Manager for transactional tables + + // set read consistency + if (table.getType() != PTableType.SYSTEM) { + scan.setConsistency(connection.getConsistency()); + } - // Get the time range of row_timestamp column - TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange(); - // Get the already existing time range on the scan. - TimeRange scanTimeRange = scan.getTimeRange(); - Long scn = connection.getSCN(); - if (scn == null) { - scn = context.getCurrentTime(); - } - try { - TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn); - if (timeRangeToUse == null) { - return ResultIterator.EMPTY_ITERATOR; - } - scan.setTimeRange(timeRangeToUse.getMin(), timeRangeToUse.getMax()); - } catch (IOException e) { - throw new RuntimeException(e); - } + if (!table.isTransactional()) { - if (context.getScanTimeRange() == null) { - Long scn = connection.getSCN(); - if (scn == null) { - scn = context.getCurrentTime(); - } - ScanUtil.setTimeRange(scan, scn); - } else { - ScanUtil.setTimeRange(scan, context.getScanTimeRange()); - } ++ // Get the time range of row_timestamp column ++ TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange(); ++ // Get the already existing time range on the scan. ++ TimeRange scanTimeRange = scan.getTimeRange(); ++ Long scn = connection.getSCN(); ++ if (scn == null) { ++ scn = context.getCurrentTime(); ++ } ++ try { ++ TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn); ++ if (timeRangeToUse == null) { ++ return ResultIterator.EMPTY_ITERATOR; ++ } ++ scan.setTimeRange(timeRangeToUse.getMin(), timeRangeToUse.getMax()); ++ } catch (IOException e) { ++ throw new RuntimeException(e); ++ } ++ } + byte[] tenantIdBytes; + if( table.isMultiTenant() == true ) { + tenantIdBytes = connection.getTenantId() == null ? null : + ScanUtil.getTenantIdBytes( + table.getRowKeySchema(), + table.getBucketNum()!=null, + connection.getTenantId()); + } else { + tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); } - ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes()); + + ScanUtil.setTenantId(scan, tenantIdBytes); String customAnnotations = LogUtil.customAnnotationsToString(connection); ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes()); // Set local index related scan attributes. http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java index 07bdcc8,0000000..7eb3c3a mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java @@@ -1,282 -1,0 +1,300 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class DelegateHTableInterface implements HTableInterface { + protected final HTableInterface delegate; + + public DelegateHTableInterface(HTableInterface delegate) { + this.delegate = delegate; + } + + @Override + public byte[] getTableName() { + return delegate.getTableName(); + } + + @Override + public TableName getName() { + return delegate.getName(); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return delegate.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return delegate.exists(get); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return delegate.exists(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + delegate.batch(actions, results); + } + + @SuppressWarnings("deprecation") + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + return delegate.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback) + throws IOException, InterruptedException { + delegate.batchCallback(actions, results, callback); + } + + @SuppressWarnings("deprecation") + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> callback) throws IOException, + InterruptedException { + return delegate.batchCallback(actions, callback); + } + + @Override + public Result get(Get get) throws IOException { + return delegate.get(get); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return delegate.get(gets); + } + + @SuppressWarnings("deprecation") + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return delegate.getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return delegate.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return delegate.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return delegate.getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + delegate.put(put); + } + + @Override + public void put(List<Put> puts) throws IOException { + delegate.put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + return delegate.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public void delete(Delete delete) throws IOException { + delegate.delete(delete); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + delegate.delete(deletes); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) + throws IOException { + return delegate.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + delegate.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return delegate.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return delegate.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @SuppressWarnings("deprecation") + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public boolean isAutoFlush() { + return delegate.isAutoFlush(); + } + + @Override + public void flushCommits() throws IOException { + delegate.flushCommits(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return delegate.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return delegate.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable { + delegate.coprocessorService(service, startKey, endKey, callable, callback); + } + + @SuppressWarnings("deprecation") + @Override + public void setAutoFlush(boolean autoFlush) { + delegate.setAutoFlush(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + delegate.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + delegate.setAutoFlushTo(autoFlush); + } + + @Override + public long getWriteBufferSize() { + return delegate.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + delegate.setWriteBufferSize(writeBufferSize); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService(MethodDescriptor methodDescriptor, + Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + return delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) throws ServiceException, + Throwable { + delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, + RowMutations mutation) throws IOException { + return delegate.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + ++ @Override ++ public boolean[] existsAll(List<Get> gets) throws IOException { ++ return delegate.existsAll(gets); ++ } ++ ++ @Override ++ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, ++ CompareOp compareOp, byte[] value, Put put) throws IOException { ++ return delegate.checkAndPut(row, family, qualifier, value, put); ++ } ++ ++ @Override ++ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, ++ CompareOp compareOp, byte[] value, Delete delete) ++ throws IOException { ++ return delegate.checkAndDelete(row, family, qualifier, compareOp, value, delete); ++ } ++ +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 5c1487c,7026433..e35851d --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@@ -18,8 -18,8 +18,9 @@@ package org.apache.phoenix.execute; import java.sql.ParameterMetaData; + import java.sql.SQLException; import java.util.List; +import java.util.Set; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@@ -108,10 -102,10 +109,15 @@@ public abstract class DelegateQueryPla public boolean isRowKeyOrdered() { return delegate.isRowKeyOrdered(); } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return delegate.useRoundRobinIterator(); + } + @Override + public Operation getOperation() { + return delegate.getOperation(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 9322378,72920b2..cf89380 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@@ -81,10 -83,9 +86,10 @@@ public class HashJoinPlan extends Deleg private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; + private final Set<TableRef> tableRefs; + private final int maxServerCacheTimeToLive; private List<SQLCloseable> dependencies; private HashCacheClient hashClient; - private int maxServerCacheTimeToLive; private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; @@@ -113,20 -114,17 +118,27 @@@ this.joinInfo = joinInfo; this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; + this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size()); + this.tableRefs.addAll(plan.getSourceRefs()); + for (SubPlan subPlan : subPlans) { + tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs()); + } + this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); } - + @Override + public Set<TableRef> getSourceRefs() { + return tableRefs; + } + + @Override public ResultIterator iterator() throws SQLException { + return iterator(DefaultParallelScanGrouper.getInstance()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { int count = subPlans.length; PhoenixConnection connection = getContext().getConnection(); ConnectionQueryServices services = connection.getQueryServices();
