This is an automated email from the ASF dual-hosted git repository. apurtell pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit a5ea1189424913af098a5bbd55cdee035f46f9ba Author: Jacob Isaac <jacobpisaa...@gmail.com> AuthorDate: Thu Apr 24 17:12:46 2025 -0700 PHOENIX-7107 Add support for indexing on SYSTEM.CATALOG table (#2048) --- .../phoenix/compile/CreateIndexCompiler.java | 38 + .../apache/phoenix/exception/SQLExceptionCode.java | 4 + .../apache/phoenix/expression/ExpressionType.java | 3 +- .../function/DecodeViewIndexIdFunction.java | 183 ++++ .../apache/phoenix/parse/CreateTableStatement.java | 10 +- .../phoenix/parse/DecodeViewIndexIdParseNode.java | 79 ++ .../phoenix/query/ConnectionQueryServicesImpl.java | 26 +- .../org/apache/phoenix/query/QueryServices.java | 1 + .../apache/phoenix/query/QueryServicesOptions.java | 2 + .../org/apache/phoenix/schema/MetaDataClient.java | 4 +- .../phoenix/coprocessor/MetaDataEndpointImpl.java | 166 +++- .../phoenix/coprocessor/TaskRegionObserver.java | 2 + .../phoenix/end2end/BaseRowKeyMatcherTestIT.java | 59 +- .../end2end/index/PartialSystemCatalogIndexIT.java | 988 +++++++++++++++++++++ .../TestTrackingParallelWriterIndexCommitter.java | 117 +++ .../jdbc/HighAvailabilityTestingUtility.java | 5 + .../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 128 ++- .../parse/DecodeViewIndexIdFunctionTest.java | 59 ++ .../java/org/apache/phoenix/query/BaseTest.java | 4 + 19 files changed, 1813 insertions(+), 65 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java index c98f9a6a5c..bcdae4013d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.compile; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.client.Scan; @@ -32,6 +33,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.CreateIndexStatement; +import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor; import org.apache.phoenix.parse.SubqueryParseNode; @@ -73,6 +75,10 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import static org.apache.phoenix.query.QueryServices.DEFAULT_SYSTEM_KEEP_DELETED_CELLS_ATTRIB; +import static org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED; + public class CreateIndexCompiler { private final PhoenixStatement statement; private final Operation operation; @@ -230,6 +236,7 @@ public class CreateIndexCompiler { } public MutationPlan compile(final CreateIndexStatement create) throws SQLException { final PhoenixConnection connection = statement.getConnection(); + verifyDataTable(connection, create.getTable()); final ColumnResolver resolver = FromCompiler.getResolverForCreateIndex( create, connection, create.getUdfParseNodes()); @@ -278,4 +285,35 @@ public class CreateIndexCompiler { }; } + + /** + * Helper method to validate CREATE INDEX statements on SYSTEM tables. + * 1. Pass if scheme name not provided, assumption is - it not a SYSTEM table. + * 2. Fail if SYSTEM_CATALOG_INDEXES_ENABLED not enabled + * 3. Fail if table other than SYSTEM.CATALOG + * + * @param connection + * @param table + * @throws SQLException + */ + private void verifyDataTable(PhoenixConnection connection, NamedTableNode table) throws SQLException { + Configuration conf = connection.getQueryServices().getConfiguration(); + boolean catalogIndexesEnabled = conf.getBoolean(SYSTEM_CATALOG_INDEXES_ENABLED, DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED); + + TableName tableName = table.getName(); + if (tableName.getSchemaName() == null) { + return; + } + if (tableName.getSchemaName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME) && + !catalogIndexesEnabled) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SYSTEM_TABLE_INDEXES_NOT_ENABLED). + build().buildException(); + } + + if (tableName.getSchemaName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME) && + !tableName.getTableName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_INDEX_SYSTEM_TABLE). + build().buildException(); + } + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index a2559c2f2f..f1fdbce009 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -113,6 +113,10 @@ public enum SQLExceptionCode { " Index where clause cannot include a subquery."), CANNOT_EVALUATE_INDEX_WHERE(304, "23102", "Invalid index where clause. It cannot be evaluated on a data table row."), + SYSTEM_TABLE_INDEXES_NOT_ENABLED(305, "23103", + "Invalid index on table. Indexes on SYSTEM tables are not enabled."), + CANNOT_INDEX_SYSTEM_TABLE(306, "23104", + "Invalid index on table. SYSTEM Indexes can only be on SYSTEM.CATALOG table."), /** * Invalid Cursor State (errorcode 04, sqlstate 24) */ diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java index 1fb68d81ae..453925ef5d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -202,7 +202,8 @@ public enum ExpressionType { BsonValueFunction(BsonValueFunction.class), PartitionIdFunction(PartitionIdFunction.class), DecodeBinaryFunction(DecodeBinaryFunction.class), - EncodeBinaryFunction(EncodeBinaryFunction.class); + EncodeBinaryFunction(EncodeBinaryFunction.class), + DecodeViewIdFunction(DecodeViewIndexIdFunction.class); ExpressionType(Class<? extends Expression> clazz) { this.clazz = clazz; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java new file mode 100644 index 0000000000..2f531b1cc1 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.function; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Determinism; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.parse.FunctionParseNode; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.parse.DecodeViewIndexIdParseNode; +import org.apache.phoenix.parse.PhoenixRowTimestampParseNode; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.util.ByteUtil; + +import java.sql.Types; +import java.util.List; + +import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.NULL_DATA_TYPE_VALUE; +import static org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN; + +/** + * Function to return the ViewIndexId value based on the ViewIndexIDDataType field. + * Can also be used in sql predicates. + * THe ViewIndexId field value needs to be interpreted based on the type specified in the + * ViewIndexIdDataType field + This is how the various client created view index id's look like: + client VIEW_INDEX_ID(Cell number of bytes) VIEW_INDEX_ID_DATA_TYPE + pre-4.15 2 bytes NULL + post-4.15[config smallint] 2 bytes 5(smallint) + post-4.15[config bigint] 8 bytes -5(bigint) + + VIEW_INDEX_ID_DATA_TYPE, VIEW_INDEX_ID(Cell representation of the data) + NULL, SMALLINT -> RETRIEVE AND CONVERT TO BIGINT + SMALLINT, SMALLINT -> RETRIEVE AND CONVERT TO BIGINT + BIGINT, BIGINT -> DO NOT CONVERT + + */ +@BuiltInFunction(name = DecodeViewIndexIdFunction.NAME, + nodeClass= DecodeViewIndexIdParseNode.class, + args = {@FunctionParseNode.Argument(allowedTypes = { PLong.class}), + @FunctionParseNode.Argument(allowedTypes = { PInteger.class}) + }) +public class DecodeViewIndexIdFunction extends ScalarFunction { + + public static final String NAME = "DECODE_VIEW_INDEX_ID"; + + public DecodeViewIndexIdFunction() { + } + + /** + * @param children VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE expressions + */ + public DecodeViewIndexIdFunction(List<Expression> children) { + super(children); + + // It takes 2 parameters - VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE. + if ((children.size() != 2) || !children.get(0).getClass().isAssignableFrom( + KeyValueColumnExpression.class) || !children.get(1).getClass().isAssignableFrom( + KeyValueColumnExpression.class)) { + throw new IllegalArgumentException( + "DecodeViewIndexIdFunction should only have a " + + "VIEW_INDEX_ID and a VIEW_INDEX_ID_DATA_TYPE key value expression." + ); + } + if (!(children.get(0).getDataType().equals(PLong.INSTANCE))) { + throw new IllegalArgumentException( + "DecodeViewIndexIdFunction should have an " + + "VIEW_INDEX_ID key value expression of type PLong" + ); + } + + if (!(children.get(1).getDataType().equals(PInteger.INSTANCE))) { + throw new IllegalArgumentException( + "DecodeViewIndexIdFunction should have an " + + "VIEW_INDEX_ID_DATA_TYPE key value expression of type PLong" + ); + } + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + if (tuple == null) { + return false; + } + + byte[] viewIndexIdCF = ((KeyValueColumnExpression) children.get(0)).getColumnFamily(); + byte[] viewIndexIdCQ = ((KeyValueColumnExpression) children.get(0)).getColumnQualifier(); + byte[] viewIndexIdTypeCF = ((KeyValueColumnExpression) children.get(1)).getColumnFamily(); + byte[] viewIndexIdTypeCQ = ((KeyValueColumnExpression) children.get(1)).getColumnQualifier(); + + Cell viewIndexIdCell = tuple.getValue(viewIndexIdCF, viewIndexIdCQ); + Cell viewIndexIdDataTypeCell = tuple.getValue(viewIndexIdTypeCF, viewIndexIdTypeCQ); + if ((viewIndexIdCell != null) && (viewIndexIdCell.getValueLength() == 0)) { + ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); + return true; + } + + + /* + This is combination of diff client created view index looks like: + client VIEW_INDEX_ID(Cell number of bytes) VIEW_INDEX_ID_DATA_TYPE + pre-4.15 2 bytes NULL + post-4.15[config smallint] 2 bytes 5(smallint) + post-4.15[config bigint] 8 bytes -5(bigint) + + VIEW_INDEX_ID_DATA_TYPE, VIEW_INDEX_ID(Cell representation of the data) + NULL, SMALLINT -> RETRIEVE AND CONVERT TO BIGINT + SMALLINT, SMALLINT -> RETRIEVE AND CONVERT TO BIGINT + BIGINT, BIGINT -> DO NOT CONVERT + + */ + + if (viewIndexIdCell != null) { + int type = NULL_DATA_TYPE_VALUE; + if (viewIndexIdDataTypeCell != null) { + Object typeObject = PInteger.INSTANCE.toObject( + viewIndexIdDataTypeCell.getValueArray(), + viewIndexIdDataTypeCell.getValueOffset(), + viewIndexIdDataTypeCell.getValueLength(), + PInteger.INSTANCE, + SortOrder.ASC); + if (typeObject != null) { + type = (Integer) typeObject; + } + } + + ImmutableBytesWritable columnValue = + new ImmutableBytesWritable(CellUtil.cloneValue(viewIndexIdCell)); + if ((type == NULL_DATA_TYPE_VALUE || type == Types.SMALLINT) && (viewIndexIdCell.getValueLength() < + VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN)) { + byte[] newBytes = PLong.INSTANCE.toBytes(PSmallint.INSTANCE.toObject(columnValue.get())); + ptr.set(newBytes, 0, newBytes.length); + } else { + ptr.set(columnValue.get(), columnValue.getOffset(), columnValue.getLength()); + } + } + return true; + } + + @Override + public PDataType getDataType() { + return PLong.INSTANCE; + } + + @Override + public boolean isStateless() { + return false; + } + + @Override + public Determinism getDeterminism() { + return Determinism.PER_ROW; + } + +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java index 37376c985e..d7f897a53b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java @@ -100,7 +100,15 @@ public class CreateTableStatement extends MutableStatement { Map<String, Integer> familyCounters, boolean noVerify) { this.tableName = tableName; this.props = props == null ? ImmutableListMultimap.<String,Pair<String,Object>>of() : props; - this.tableType = PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(tableName.getSchemaName()) ? PTableType.SYSTEM : tableType; + // When it is an index on SYSTEM.CATALOG tableType => PTableType.INDEX + // If Schema is SYSTEM and tableType = SYSTEM | TABLE => PTableType.SYSTEM + // else the passed in tableType + this.tableType = + (PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals( + tableName.getSchemaName()) && + (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) ? + PTableType.SYSTEM : + tableType); this.columns = columns == null ? ImmutableList.<ColumnDef>of() : ImmutableList.<ColumnDef>copyOf(columns); this.pkConstraint = pkConstraint == null ? PrimaryKeyConstraint.EMPTY : pkConstraint; this.splitNodes = splitNodes == null ? Collections.<ParseNode>emptyList() : ImmutableList.copyOf(splitNodes); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java new file mode 100644 index 0000000000..6b299de9aa --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.parse; + +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.function.DecodeViewIndexIdFunction; +import org.apache.phoenix.expression.function.FunctionExpression; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.IndexUtil; + +import java.sql.SQLException; +import java.util.List; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE; + +public class DecodeViewIndexIdParseNode extends FunctionParseNode { + + DecodeViewIndexIdParseNode(String name, List<ParseNode> children, + BuiltInFunctionInfo info) { + super(name, children, info); + // It takes 2 parameters - VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE. + if (children.size() != 2) { + throw new IllegalArgumentException( + "DecodeViewIndexIdParseNode should only have " + + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse nodes." + ); + } + if (children.get(0).getClass().isAssignableFrom(ColumnParseNode.class) + && children.get(1).getClass().isAssignableFrom(ColumnParseNode.class) + && (!(((ColumnParseNode) children.get(0)).getName().equals(VIEW_INDEX_ID)) + || !(((ColumnParseNode) children.get(1)).getName().equals(VIEW_INDEX_ID_DATA_TYPE))) + ) { + throw new IllegalArgumentException( + "DecodeViewIndexIdParseNode should only have " + + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse nodes." + ); + } + + // CastPastNode is generated during IndexStatement rewriting + if (children.get(0).getClass().isAssignableFrom(CastParseNode.class) + && children.get(1).getClass().isAssignableFrom(CastParseNode.class) + && (!((ColumnParseNode) (((CastParseNode) children.get(0)).getChildren().get(0))).getName().equals( + IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, VIEW_INDEX_ID)) + || !((ColumnParseNode) (((CastParseNode) children.get(1)).getChildren().get(0))).getName().equals( + IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, VIEW_INDEX_ID_DATA_TYPE))) + ) { + throw new IllegalArgumentException( + "DecodeViewIndexIdParseNode should only have " + + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse nodes." + ); + } + + } + + @Override + public FunctionExpression create(List<Expression> children, StatementContext context) + throws SQLException { + return new DecodeViewIndexIdFunction(children); + } + +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 1f66ef11fa..dbd88bde2d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -85,6 +85,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TIMEOUT_DURING_UPGRADE_MS; import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks; import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks; @@ -1352,10 +1353,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. - // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use + // Also don't install on the SYSTEM.STATS table because we use // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318). + // With PHOENIX-7107 which introduced indexes on SYSTEM.CATALOG we need to install the + // indexing coprocessor on SYSTEM.CATALOG if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW && !isViewIndex) - && !SchemaUtil.isMetaTable(tableName) && !SchemaUtil.isStatsTable(tableName)) { if (isTransactional) { if (!newDesc.hasCoprocessor(QueryConstants.PHOENIX_TRANSACTIONAL_INDEXER_CLASSNAME)) { @@ -1785,8 +1787,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement TableDescriptorBuilder newDesc = generateTableDescriptor(physicalTableName, parentPhysicalTableName, existingDesc, tableType, props, families, splits, isNamespaceMapped); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("ensureTableCreated " + + "physicalTableName = {}, " + + "parentPhysicalTableName = {}, " + + "isUpgradeRequired = {}, " + + "isAutoUpgradeEnabled = {}, " + + "isDoNotUpgradePropSet = {}, " + + "isNamespaceMapped = {}, " + + "createdNamespace = {}", + Bytes.toString(physicalTableName), + Bytes.toString(parentPhysicalTableName), + isUpgradeRequired(), + isAutoUpgradeEnabled, + isDoNotUpgradePropSet, + isNamespaceMapped, + createdNamespace); + } + if (!tableExist) { - if (SchemaUtil.isSystemTable(physicalTableName) && !isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) { + if (SchemaUtil.isSystemTable(physicalTableName) && (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) && !isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) { // Disallow creating the SYSTEM.CATALOG or SYSTEM:CATALOG HBase table throw new UpgradeRequiredException(); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index be3801d118..bc9320723c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -431,6 +431,7 @@ public interface QueryServices extends SQLCloseable { // As opposed to a copy and async (out of band) delete. public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = "phoenix.move.child_link.during.upgrade"; + String SYSTEM_CATALOG_INDEXES_ENABLED = "phoenix.system.catalog.indexes.enabled"; /** * Parameter to indicate the source of operation attribute. * It can include metadata about the customer, service, etc. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 9755572566..3bd94e4ffa 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -450,6 +450,8 @@ public class QueryServicesOptions { public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED = true; public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT = 300000; // 5 minutes + public static final boolean DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED = false; + public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index b07b0228ab..d04f01bf3b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1824,9 +1824,11 @@ public class MetaDataClient { } Configuration config = connection.getQueryServices().getConfiguration(); + // Do descendant view validation only when enabled and not a SYSTEM table/index if (!connection.getQueryServices().getProps() .getBoolean(DISABLE_VIEW_SUBTREE_VALIDATION, - DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION)) { + DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION) && + !QueryConstants.SYSTEM_SCHEMA_NAME.equals(dataTable.getSchemaName().getString())) { verifyIfDescendentViewsExtendPk(dataTable, config); } // for view indexes diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 11fdb8dbd3..4a16b1f32c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -83,8 +83,11 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES; +import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB; import static org.apache.phoenix.query.QueryServices.SKIP_SYSTEM_TABLES_EXISTENCE_CHECK; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE_BYTES; +import static org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED; import static org.apache.phoenix.schema.PTable.LinkType.PHYSICAL_TABLE; import static org.apache.phoenix.schema.PTable.LinkType.VIEW_INDEX_PARENT_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES; @@ -203,6 +206,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -272,6 +276,7 @@ import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -608,6 +613,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } private static boolean failConcurrentMutateAddColumnOneTimeForTesting = false; + private static final String FORCE_INDEX_MUTATE_METADATA_AS_ATTRIB = String.valueOf(Integer.MAX_VALUE); private RegionCoprocessorEnvironment env; private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost; @@ -1671,7 +1677,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); isSalted, baseColumnCount, isRegularView, columnTimestamp); } } - if (tableType == INDEX && ! isThisAViewIndex) { + // Ignoring meta indexes when looking for maxLookBackAge and TTL on parent tables. + // Due to failures in Namespace related ITs when isNamespaceMappingEnabled is enabled. + boolean isMetaIndex = (QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName.getString()) && (tableType == INDEX)); + if (tableType == INDEX && ! isThisAViewIndex && !isMetaIndex) { byte[] tableKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(), parentSchemaName == null ? null : parentSchemaName.getBytes(), parentTableName.getBytes()); maxLookbackAge = scanMaxLookbackAgeFromParent(tableKey, clientTimeStamp); @@ -1679,7 +1688,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); builder.setMaxLookbackAge(maxLookbackAge != null ? maxLookbackAge : (oldTable != null ? oldTable.getMaxLookbackAge() : null)); - if (tableType == INDEX && !isThisAViewIndex && ttl.equals(TTL_EXPRESSION_NOT_DEFINED)) { + if (tableType == INDEX && !isThisAViewIndex && ttl.equals(TTL_EXPRESSION_NOT_DEFINED) && !isMetaIndex) { //If this is an index on Table get TTL from Table byte[] tableKey = getTableKey(tenantId == null ? null : tenantId.getBytes(), parentSchemaName == null ? null : parentSchemaName.getBytes(), @@ -2798,14 +2807,23 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } } + + // Not sure whether this TODO is relevant anymore. PHOENIX-7107 introduces indexes + // on system table. // TODO: Switch this to HRegion#batchMutate when we want to support indexes on the // system table. Basically, we get all the locks that we don't already hold for all the // tableMetadata rows. This ensures we don't have deadlock situations (ensuring // primary and then index table locks are held, in that order). For now, we just don't support // indexing on the system table. This is an issue because of the way we manage batch mutation // in the Indexer. - mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations, Collections.<byte[]>emptySet(), - HConstants.NO_NONCE, HConstants.NO_NONCE); + + // Update SYSTEM.CATALOG indexes only for + // 1. ordinary table/index mutations (create table/index). + // 2. When creating system indexes itself, no further index processing is required. + boolean updateCatalogIndexes = !SchemaUtil.isSystemTable(Bytes.toBytes(fullTableName)); + mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE, + updateCatalogIndexes); // Invalidate the cache - the next getTable call will add it // TODO: consider loading the table that was just created here, patching up the parent table, and updating the cache @@ -3138,10 +3156,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } throw new IllegalStateException(msg); } + // Update SYSTEM.CATALOG indexes only for + // 1. ordinary table/index mutations (drop table/index). + // 2. When dropping system indexes itself, no further index processing is required. + boolean + updateCatalogIndexes = + (pTableType != INDEX) || (!Bytes.toString(schemaName) + .equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA)); // drop rows from catalog on this region - mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations, - Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE, + updateCatalogIndexes); long currentTime = MetaDataUtil.getClientTimeStamp(tableMetadata); for (ImmutableBytesPtr ckey : invalidateList) { @@ -3367,7 +3393,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), clientTimeStamp); - catalogMutations.add(delete); + // Remove duplicate mutations if present + if (Bytes.compareTo(key, 0, key.length, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()) != 0) { + catalogMutations.add(delete); + } results.clear(); scanner.next(results); } while (!results.isEmpty()); @@ -3746,8 +3775,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); throw new IllegalStateException(msg); } } - mutateRowsWithLocks(this.accessCheckEnabled, region, localMutations, - Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + // Update SYSTEM.CATALOG indexes only for ordinary table column mutations. + // Column mutations of indexes are not allowed. See above + // Add column on SYSTEM.CATALOG should not be processed for index updates, + // since an index on a future column cannot exist. + boolean + updateCatalogIndexes = + !Bytes.toString(schemaName) + .equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); + + mutateRowsWithLocks(this.accessCheckEnabled, env, region, localMutations, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE, + updateCatalogIndexes); // Invalidate from cache for (ImmutableBytesPtr invalidateKey : invalidateList) { metaDataCache.invalidate(invalidateKey); @@ -4583,8 +4622,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); long serverTimestamp = EnvironmentEdgeManager.currentTimeMillis(); tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate( key, clientTimeStamp, serverTimestamp)); - mutateRowsWithLocks(this.accessCheckEnabled, region, tableMetadata, Collections.<byte[]>emptySet(), - HConstants.NO_NONCE, HConstants.NO_NONCE); + boolean updateCatalogIndexes = !Bytes.toString(schemaName) + .equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); + + mutateRowsWithLocks(this.accessCheckEnabled, env, region, tableMetadata, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE, updateCatalogIndexes); // Invalidate from cache Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -4835,7 +4878,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // Don't store function info for temporary functions. if (!temporaryFunction) { mutateRowsWithLocks(this.accessCheckEnabled, region, functionMetaData, - Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); } // Invalidate the cache - the next getFunction call will add it @@ -4891,8 +4935,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); done.run(MetaDataMutationResult.toProto(result)); return; } - mutateRowsWithLocks(this.accessCheckEnabled, region, functionMetaData, Collections.<byte[]>emptySet(), - HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(this.accessCheckEnabled, region, functionMetaData, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); long currentTime = MetaDataUtil.getClientTimeStamp(functionMetaData); @@ -5013,8 +5057,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); return; } } - mutateRowsWithLocks(this.accessCheckEnabled, region, schemaMutations, Collections.<byte[]>emptySet(), - HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(this.accessCheckEnabled, region, schemaMutations, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); // Invalidate the cache - the next getSchema call will add it Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = @@ -5065,8 +5109,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); done.run(MetaDataMutationResult.toProto(result)); return; } - mutateRowsWithLocks(this.accessCheckEnabled, region, schemaMetaData, Collections.<byte[]>emptySet(), - HConstants.NO_NONCE, HConstants.NO_NONCE); + mutateRowsWithLocks(this.accessCheckEnabled, region, schemaMetaData, + Collections.<byte[]>emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env) .getMetaDataCache(); long currentTime = MetaDataUtil.getClientTimeStamp(schemaMetaData); @@ -5135,18 +5179,56 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); /** * Perform atomic mutations on rows within a region - * + * additionally set metadata on mutations, if catalog indexes exists * @param accessCheckEnabled Use the login user to mutate rows if enabled - * @param region Region containing rows to be mutated - * @param mutations List of mutations for rows that must be contained within the region - * @param rowsToLock Rows to lock - * @param nonceGroup Optional nonce group of the operation - * @param nonce Optional nonce of the operation + * @param env The RegionCoprocessorEnvironment + * @param region Region containing rows to be mutated + * @param mutations List of mutations for rows that must be contained within the + * region + * @param rowsToLock Rows to lock + * @param nonceGroup Optional nonce group of the operation + * @param nonce Optional nonce of the operation + * @param updateCatalogIndexes check if Catalog indexes exists * @throws IOException */ + + static void mutateRowsWithLocks(final boolean accessCheckEnabled, + final RegionCoprocessorEnvironment env, final Region region, + final List<Mutation> mutations, final Set<byte[]> rowsToLock, final long nonceGroup, + final long nonce, boolean updateCatalogIndexes) throws IOException { + + try { + Configuration conf = env.getConfiguration(); + boolean catalogIndexesEnabled = conf.getBoolean(SYSTEM_CATALOG_INDEXES_ENABLED, DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED); + if ((updateCatalogIndexes) && (catalogIndexesEnabled)) { + setMetaDataOnMutationsIfCatalogIndexExists(env, mutations ); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Setting metadata on mutations :" + mutations); + } + } + } catch (SQLException e) { + throw new IOException(e); + } + + mutateRowsWithLocks(accessCheckEnabled, region, mutations, rowsToLock, nonceGroup, nonce); + } + + + /** + * Perform atomic mutations on rows within a region + * @param accessCheckEnabled Use the login user to mutate rows if enabled + * @param region Region containing rows to be mutated + * @param mutations List of mutations for rows that must be contained within the + * region + * @param rowsToLock Rows to lock + * @param nonceGroup Optional nonce group of the operation + * @param nonce Optional nonce of the operation + * @throws IOException + */ static void mutateRowsWithLocks(final boolean accessCheckEnabled, final Region region, final List<Mutation> mutations, final Set<byte[]> rowsToLock, final long nonceGroup, final long nonce) throws IOException { + // We need to mutate SYSTEM.CATALOG or SYSTEM.CHILD_LINK with HBase/login user // if access is enabled. if (accessCheckEnabled) { @@ -5172,6 +5254,40 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } } + private static void setMetaDataOnMutationsIfCatalogIndexExists( + final RegionCoprocessorEnvironment env, final List<Mutation> mutations) + throws SQLException, IOException { + final byte[] key = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); + ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(env).getMetaDataCache(); + PTable systemCatalogPTable = (PTable) metaDataCache.getIfPresent(cacheKey); + if (systemCatalogPTable == null) { + try (PhoenixConnection connection = getServerConnectionForMetaData(new Properties(), env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + systemCatalogPTable = PhoenixRuntime.getTableNoCache(connection, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + } catch (SQLException sqle) { + throw new IOException(String.format("Failed to get PTable for SYSTEM.CATALOG via getTableNoCache() : key = %s" , Bytes.toString(key)), sqle); + } + } + + if (systemCatalogPTable == null) { + throw new IOException(String.format("Failed to get PTable for SYSTEM.CATALOG via getTableNoCache() and GlobalCache: key = %s" , Bytes.toString(key))); + } + + if ((systemCatalogPTable.getIndexes().isEmpty())) { + LOGGER.debug("No indexes found for SYSTEM.CATALOG: key = {}", Bytes.toString(key)); + return; + } + Properties metaConnectionProps = new Properties(); + metaConnectionProps.setProperty(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, FORCE_INDEX_MUTATE_METADATA_AS_ATTRIB); + try (PhoenixConnection connection = getServerConnectionForMetaData(metaConnectionProps, env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(systemCatalogPTable, ptr, connection); + IndexMetaDataCacheClient.setMetaDataOnMutations(connection, systemCatalogPTable, mutations, ptr); + } + } + private TableName getParentPhysicalTableName(PTable table) { return (table .getType() == PTableType.VIEW || (table.getType() == INDEX && table.getViewIndexId() != null)) @@ -5197,7 +5313,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); * @return Connection object. * @throws SQLException If the Connection could not be retrieved. */ - private static Connection getServerConnectionForMetaData(final Configuration config) + private static Connection getServerConnectionForMetaData(final Configuration config) throws SQLException { Preconditions.checkNotNull(config, "The configs must not be null"); return getServerConnectionForMetaData(new Properties(), config); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java index 8a3a8f681b..845155f6cc 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java @@ -162,6 +162,8 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor { public void run() { PhoenixConnection connForTask = null; try { + // TODO: Not sufficient info available when namespaces are enabled and index rebuild task are run + // getConnectionOnServer can fail when namespaces are enabled but SYSTEM namespace not available connForTask = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); String[] excludeStates = new String[] { PTable.TaskStatus.FAILED.toString(), PTable.TaskStatus.COMPLETED.toString() }; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java index c4072127aa..b3e68b8212 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java @@ -90,6 +90,7 @@ import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; @@ -108,6 +109,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_ import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -504,7 +507,42 @@ public abstract class BaseRowKeyMatcherTestIT extends ParallelStatsDisabledIT { } - // Helper to get rowKeyMatcher from Metadata. + byte[] getRowKeyMatcherFromSyscatIndex(String tenantId, String schemaName, + String tableName, boolean useIndexTable) throws SQLException { + + final String + SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL = + "SELECT ROW_KEY_MATCHER FROM SYSTEM.CATALOG " + "WHERE %s AND TABLE_SCHEM <> 'SYSTEM' AND TABLE_NAME = '%s' AND ROW_KEY_MATCHER IS NOT NULL"; + final String SYS_CATALOG_IDX_ROW_KEY_MATCHER_HEADER_SQL = "SELECT \"0:ROW_KEY_MATCHER\" FROM SYSTEM.SYS_ROW_KEY_MATCHER_IDX " + "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'"; + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String + tenantClause = useIndexTable ? + (tenantId == null || tenantId.isEmpty() ? + "\":TENANT_ID\" IS NULL" : + String.format("\":TENANT_ID\" = '%s'", tenantId)) : + (tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId)); + String + sql = useIndexTable ? + String.format(SYS_CATALOG_IDX_ROW_KEY_MATCHER_HEADER_SQL, tenantClause, schemaName, + tableName) : + String.format(SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL, tenantClause, + tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + byte[] matcherBytes = rs.next() ? rs.getBytes(1) : EMPTY_BYTE_ARRAY; + LOGGER.info("Row key matcher SQL: {}", sql); + LOGGER.info("Row key matcher: {}, {}", + Bytes.toStringBinary(matcherBytes), + Bytes.toStringBinary(PVarbinaryEncoded.INSTANCE.toBytes(matcherBytes))); + return PVarbinaryEncoded.INSTANCE.toBytes(matcherBytes); + } + } + + // Helper to get rowKeyMatcher from Metadata. private Pair<String, byte[]> getRowKeyMatchersFromView(PhoenixConnection connection, PTable view) throws SQLException { return getRowKeyMatchersFromView(connection, view.getName().getString()); @@ -560,6 +598,12 @@ public abstract class BaseRowKeyMatcherTestIT extends ParallelStatsDisabledIT { PVarbinaryEncoded.INSTANCE.toBytes( WhereOptimizer.getRowKeyMatcher(connection, tableName, viewStatementTable, viewColumnConstantsToBe, isViewColumnReferencedToBe)); + byte[] + rowKeyMatcher3 = getRowKeyMatcherFromSyscatIndex(view.getTenantId() != null ? view.getTenantId().getString() : null, view.getSchemaName().getString(), view.getTableName().getString(), false); + + byte[] + rowKeyMatcher4 = getRowKeyMatcherFromSyscatIndex(view.getTenantId() != null ? view.getTenantId().getString() : null, view.getSchemaName().getString(), view.getTableName().getString(), true); + LOGGER.debug(String.format( "target-view-name = %s, physical = %s, stmt-table = %s\n, " + "row-matcher-0 = %s (syscat)\n, row-matcher-1 = %s\n, row-matcher-2 = %s\n", @@ -571,6 +615,10 @@ public abstract class BaseRowKeyMatcherTestIT extends ParallelStatsDisabledIT { Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher1) == 0); assertTrue("RowKey matcher patterns do not match", Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher2) == 0); + assertTrue("RowKey matcher patterns do not match", + Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher3) == 0); + assertTrue("RowKey matcher patterns do not match", + Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher4) == 0); return rowKeyMatcher1; } @@ -808,6 +856,15 @@ public abstract class BaseRowKeyMatcherTestIT extends ParallelStatsDisabledIT { List<PDataType[]> testCases = getTestCases(); SortOrder[][] sortOrders = getSortOrders(); + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + //TestUtil.dumpTable(conn, TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)); + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_VIEW_HDR_IDX ON SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY) INCLUDE (TABLE_TYPE, VIEW_STATEMENT, TTL, ROW_KEY_MATCHER) WHERE TABLE_TYPE = 'v'"); + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_ROW_KEY_MATCHER_IDX ON SYSTEM.CATALOG(ROW_KEY_MATCHER, TTL, TABLE_TYPE, TENANT_ID, TABLE_SCHEM, TABLE_NAME) INCLUDE (VIEW_STATEMENT) WHERE TABLE_TYPE = 'v' AND ROW_KEY_MATCHER IS NOT NULL"); + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_VIEW_INDEX_HDR_IDX ON SYSTEM.CATALOG(DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE), TENANT_ID, TABLE_SCHEM, TABLE_NAME) INCLUDE(TABLE_TYPE, LINK_TYPE, VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE) WHERE TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND VIEW_INDEX_ID IS NOT NULL"); + conn.commit(); + } + String tableName = ""; tableName = createViewHierarchy( testCases, sortOrders, 500, 5000, 3, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java new file mode 100644 index 0000000000..781cc9263a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java @@ -0,0 +1,988 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.phoenix.end2end.index; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.end2end.ViewTTLIT; +import org.apache.phoenix.hbase.index.write.TestTrackingParallelWriterIndexCommitter; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.HBaseFactoryProvider; +import org.apache.phoenix.query.PhoenixTestBuilder; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TableViewFinderResult; +import org.apache.phoenix.util.ViewUtil; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_INDEX_SYSTEM_TABLE; +import static org.apache.phoenix.exception.SQLExceptionCode.MISMATCHED_TOKEN; +import static org.apache.phoenix.hbase.index.write.IndexWriter.INDEX_COMMITTER_CONF_KEY; +import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.COLUMN_TYPES; +import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.TENANT_VIEW_COLUMNS; +import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED; +import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS; +import static org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category(NeedsOwnMiniClusterTest.class) +public class PartialSystemCatalogIndexIT extends ParallelStatsDisabledIT { + static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class); + static final int VIEW_TTL_10_SECS = 10; + static final int VIEW_TTL_300_SECS = 300; + static final int VIEW_TTL_120_SECS = 120; + + // Various Test System indexes + private final static String SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME = "INDEX_TABLE_LINK_TEST_INDEX"; + private final static String FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME = QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME; + private final static String SYS_INDEX_HDR_TEST_INDEX_NAME = "INDEX_HDR_TEST_INDEX"; + private final static String FULL_SYS_INDEX_HDR_TEST_INDEX_NAME = QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_INDEX_HDR_TEST_INDEX_NAME; + private final static String SYS_VIEW_HDR_TEST_INDEX_NAME = "VIEW_HDR_TEST_INDEX"; + private final static String FULL_SYS_VIEW_HDR_TEST_INDEX_NAME = QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_VIEW_HDR_TEST_INDEX_NAME; + private final static String SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME = "VIEW_INDEX_HDR_TEST_INDEX"; + private final static String FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME = QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME; + private final static String SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME = "ROW_KEY_MATCHER_TEST_INDEX"; + private final static String FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME = QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME; + + // System Index creation statements + private final static String SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL = String.format( + "CREATE INDEX IF NOT EXISTS %s " + + "ON SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY) INCLUDE(TABLE_TYPE, LINK_TYPE) " + + "WHERE TABLE_TYPE = 'i' AND LINK_TYPE = 1", SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME); + private final static String SYS_INDEX_HDR_TEST_INDEX_SQL = String.format( + "CREATE INDEX IF NOT EXISTS %s " + + "ON SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, INDEX_STATE, DATA_TABLE_NAME) INCLUDE(TABLE_TYPE) " + + "WHERE TABLE_TYPE = 'i' AND INDEX_STATE IS NOT NULL", SYS_INDEX_HDR_TEST_INDEX_NAME); + private final static String SYS_VIEW_HDR_TEST_INDEX_SQL = String.format( + "CREATE INDEX IF NOT EXISTS %s " + + "ON SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY) " + + "INCLUDE (TABLE_TYPE, VIEW_STATEMENT, TTL, ROW_KEY_MATCHER) " + + "WHERE TABLE_TYPE = 'v'", SYS_VIEW_HDR_TEST_INDEX_NAME); + private final static String SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL = String.format( + "CREATE INDEX IF NOT EXISTS %s " + + "ON SYSTEM.CATALOG(ROW_KEY_MATCHER, TTL, TABLE_TYPE, TENANT_ID, TABLE_SCHEM, TABLE_NAME) " + + "INCLUDE (VIEW_STATEMENT) " + + "WHERE TABLE_TYPE = 'v' AND ROW_KEY_MATCHER IS NOT NULL", SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME); + private final static String SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL = String.format( + "CREATE INDEX IF NOT EXISTS %s " + + "ON SYSTEM.CATALOG(DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE), TENANT_ID, TABLE_SCHEM, TABLE_NAME) " + + "INCLUDE(TABLE_TYPE, LINK_TYPE, VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE) " + + "WHERE TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND VIEW_INDEX_ID IS NOT NULL", SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME); + + // SQLs on the data table - SYSTEM.CATALOG + static final String SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL = "SELECT ROW_KEY_MATCHER FROM SYSTEM.CATALOG " + + "WHERE %s AND TABLE_SCHEM <> 'SYSTEM' AND TABLE_NAME = '%s' AND " + "ROW_KEY_MATCHER IS NOT NULL"; + + static final String SYS_CATALOG_VIEW_TTL_HEADER_SQL = "SELECT TTL FROM SYSTEM.CATALOG " + + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = 'v'"; + + static final String SYS_CATALOG_VIEW_INDEX_HEADER_SQL = "SELECT VIEW_INDEX_ID FROM SYSTEM.CATALOG " + + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = 'i' AND LINK_TYPE IS NULL"; + + static final String SYS_CATALOG_SYS_INDEX_TABLE_SQL = "SELECT count(*) FROM SYSTEM.CATALOG " + + "WHERE TABLE_SCHEM = 'SYSTEM' AND TABLE_NAME = '%s'"; + + static final String SYS_CATALOG_INDEX_TABLE_LINK_SQL = "SELECT count(*) FROM SYSTEM.CATALOG " + + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = 'i'" + + " AND LINK_TYPE = 1"; + + static final String SYS_CATALOG_INDEX_HDR_SQL = "SELECT count(*) FROM SYSTEM.CATALOG " + + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = 'i'" + + " AND INDEX_STATE IS NOT NULL"; + + static final String SYS_CATALOG_COLUMN_EXISTS_SQL = "SELECT count(*) FROM SYSTEM.CATALOG " + + "WHERE TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s'" ; + + // SQL on the index table - SYSTEM.SYS_INDEX_TABLE_LINK_TEST_INDEX, + static final String SYS_CATALOG_IDX_INDEX_TABLE_LINK_SQL = "SELECT \":COLUMN_FAMILY\" FROM %s " + + "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ; + + // SQL on the index table - SYSTEM.SYS_INDEX_HDR_TEST_INDEX_NAME, + static final String SYS_CATALOG_IDX_INDEX_HDR_SQL = "SELECT \"0:DATA_TABLE_NAME\", \"0:INDEX_STATE\" FROM %s " + + "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ; + + // SQL on the index table - SYSTEM.SYS_VIEW_HDR_TEST_INDEX, + static final String SYS_CATALOG_IDX_VIEW_HEADER_SQL = "SELECT \"0:VIEW_STATEMENT\" FROM %s " + + "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ; + + // SQL on the index table - SYSTEM.SYS_VIEW_INDEX_HDR_TEST_INDEX, + static final String SYS_CATALOG_IDX_VIEW_INDEX_HEADER_SQL = "SELECT \": DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID,VIEW_INDEX_ID_DATA_TYPE)\" FROM %s " + + "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ; + + private static RegionCoprocessorEnvironment taskRegionEnvironment; + private static HBaseTestingUtility hbaseTestUtil; + + @BeforeClass + public static void doSetup() throws Exception { + InstanceResolver.clearSingletons(); + // Override to get required config for static fields loaded that require HBase config + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + + @Override public Configuration getConfiguration() { + Configuration conf = HBaseConfiguration.create(); + conf.set(SYSTEM_CATALOG_INDEXES_ENABLED, String.valueOf(true)); + return conf; + } + + @Override public Configuration getConfiguration(Configuration confToClone) { + Configuration conf = HBaseConfiguration.create(); + conf.set(SYSTEM_CATALOG_INDEXES_ENABLED, String.valueOf(true)); + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + conf.set(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); + conf.set(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); + hbaseTestUtil = new HBaseTestingUtility(conf); + setUpConfigForMiniCluster(conf); + conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + hbaseTestUtil.startMiniCluster(); + + + // Turn on the View TTL feature + Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>() {{ + put(QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED, String.valueOf(true)); + put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, String.valueOf(true)); + // no max lookback + put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(0)); + put(QueryServices.PHOENIX_VIEW_TTL_ENABLED, Boolean.toString(true)); + put(QueryServices.PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT, String.valueOf(1)); + put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); + put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, + Long.toString(Long.MAX_VALUE)); + }}; + + setUpTestDriver(new ReadOnlyProps(ReadOnlyProps.EMPTY_PROPS, + DEFAULT_PROPERTIES.entrySet().iterator())); + + taskRegionEnvironment = + getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME) + .getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(TaskRegionObserver.class.getName()); + + } + + + void assertSystemCatalogHasIndexHdr(String tenantId, String schemaName, + String tableName) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId); + String sql = String + .format(SYS_CATALOG_INDEX_HDR_SQL, tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + int numRows = rs.next() ? rs.getInt(1) : 0; + + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), 1, numRows); + } + } + + void assertAdditionalColumnInMetaIndexTable(String schemaName, String indexName, String newColumnName, + boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String sql = String + .format(SYS_CATALOG_COLUMN_EXISTS_SQL, schemaName, indexName, newColumnName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + int numRows = rs.next() ? rs.getInt(1) : 0; + + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, indexName), exists ? 1 : 0, numRows); + } + } + + void assertSystemCatalogHasIndexTableLinks(String tenantId, String schemaName, + String tableName) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId); + String sql = String + .format(SYS_CATALOG_INDEX_TABLE_LINK_SQL, tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + int numRows = rs.next() ? rs.getInt(1) : 0; + + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), 1, numRows); + } + } + + void assertSystemCatalogHasViewIndexHeaderRelatedColumns(String tenantId, String schemaName, + String tableName, boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId); + String sql = String + .format(SYS_CATALOG_VIEW_INDEX_HEADER_SQL, tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + String viewIndexId = rs.next() ? rs.getString(1) : null; + if (exists) { + assertNotNull(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), viewIndexId); + } else { + assertNull(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), viewIndexId); + } + + } + } + + void assertSystemCatalogHasViewHeaderRelatedColumns(String tenantId, String schemaName, + String tableName, boolean exists, long ttlValueExpected) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId); + String sql = String + .format(SYS_CATALOG_VIEW_TTL_HEADER_SQL, tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + if (exists) { + String ttlStr = rs.next() ? rs.getString(1) : null; + long actualTTLValueReturned = ttlStr != null ? Integer.valueOf(ttlStr): 0; + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), ttlValueExpected, actualTTLValueReturned); + } else { + assertFalse(String.format("Rows do exists for schema = %s, table = %s", + schemaName, tableName), rs.next()); + + } + } + } + + void assertSystemCatalogHasRowKeyMatcherRelatedColumns(String tenantId, String schemaName, + String tableName, boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "TENANT_ID IS NULL" : + String.format("TENANT_ID = '%s'", tenantId); + String sql = String + .format(SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL, tenantClause, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + if (exists) { + byte[] matcherBytes = rs.next() ? rs.getBytes(1) : null; + assertNotNull(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), matcherBytes); + } else { + assertFalse(String.format("Rows do exists for schema = %s, table = %s", + schemaName, tableName), rs.next()); + + } + } + } + + String stripQuotes(String name) { + return name.replace("\"", ""); + } + + void assertSystemCatalogIndexTable(String systemCatalogIndexName, boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String sql = String.format(SYS_CATALOG_SYS_INDEX_TABLE_SQL, systemCatalogIndexName, + systemCatalogIndexName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + rs.next(); + assertTrue(String.format("Expected rows do not match for index-table = SYSTEM.%s", + systemCatalogIndexName), exists ? rs.getInt(1) > 0 : rs.getInt(1) == 0 ); + } + } + + void assertSystemCatalogIndexHaveIndexHdr(String systemCatalogIndexName, + String tenantId, String schemaName, + String tableName, boolean exists, String indexName, String expectedIndexState) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "\":TENANT_ID\" IS NULL" : + String.format("\":TENANT_ID\" = '%s'", tenantId); + String sql = String.format(SYS_CATALOG_IDX_INDEX_HDR_SQL, systemCatalogIndexName, + tenantClause, schemaName, indexName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + boolean hasRows = rs.next(); + String actualDataTableName = hasRows ? rs.getString(1) : null; + String actualIndexState = hasRows ? rs.getString(2) : null; + if (exists) { + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, indexName), tableName, actualDataTableName); + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, indexName), expectedIndexState, actualIndexState); + } else { + assertNull(String.format("Zero rows expected for schema = %s, table = %s", + schemaName, indexName), actualDataTableName); + assertNull(String.format("Zero rows expected for schema = %s, table = %s", + schemaName, indexName), actualIndexState); + } + } + } + + + void assertSystemCatalogIndexHaveIndexTableLinks(String systemCatalogIndexName, + String tenantId, String schemaName, + String tableName, boolean exists, String indexName) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "\":TENANT_ID\" IS NULL" : + String.format("\":TENANT_ID\" = '%s'", tenantId); + String sql = String.format(SYS_CATALOG_IDX_INDEX_TABLE_LINK_SQL, systemCatalogIndexName, + tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + String colFamilyStr = rs.next() ? rs.getString(1) : null; + if (exists) { + assertEquals(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), indexName, colFamilyStr); + } else { + assertNull(String.format("Zero rows expected for schema = %s, table = %s", + schemaName, tableName), colFamilyStr); + } + } + } + + void assertSystemCatalogIndexHaveViewHeaders(String systemCatalogIndexName, + String tenantId, String schemaName, + String tableName, boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "\":TENANT_ID\" IS NULL" : + String.format("\":TENANT_ID\" = '%s'", tenantId); + String sql = String.format(SYS_CATALOG_IDX_VIEW_HEADER_SQL, systemCatalogIndexName, + tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + String viewStmt = rs.next() ? rs.getString(1) : null; + if (exists) { + assertNotNull(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), viewStmt); + } else { + assertNull(String.format("Zero rows expected for schema = %s, table = %s", + schemaName, tableName), viewStmt); + } + } + } + + + void assertSystemCatalogIndexHaveViewIndexHeaders(String systemCatalogIndexName, + String tenantId, String schemaName, + String tableName, boolean exists) throws SQLException { + + try (Connection connection = DriverManager.getConnection(getUrl())) { + Statement stmt = connection.createStatement(); + String tenantClause = tenantId == null || tenantId.isEmpty() ? + "\":TENANT_ID\" IS NULL" : + String.format("\":TENANT_ID\" = '%s'", tenantId); + String sql = String.format(SYS_CATALOG_IDX_VIEW_INDEX_HEADER_SQL, systemCatalogIndexName, + tenantClause, schemaName, tableName); + stmt.execute(sql); + ResultSet rs = stmt.getResultSet(); + Integer viewIndexId = rs.next() ? rs.getInt(1) : null; + if (exists) { + assertNotNull(String.format("Expected rows do not match for schema = %s, table = %s", + schemaName, tableName), viewIndexId); + } else { + assertNull(String.format("Zero rows expected for schema = %s, table = %s", + schemaName, tableName), viewIndexId); + } + } + } + + void dropSystemCatalogIndex(String sysIndexName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format("drop index %s ON SYSTEM.CATALOG", sysIndexName)); + conn.commit(); + } + } + + void alterIndexState(String indexName, String tableName, PIndexState newState) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute(String.format("alter index %s ON %s %s", indexName, tableName, newState.name())); + conn.commit(); + } + } + + void dropTableWithChildViews(String baseTable, int numTaskRuns) throws Exception { + // Drop the base table + + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + // Empty the task table first. + conn.createStatement() + .execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME); + + String dropTableSQL = String.format("DROP TABLE IF EXISTS %s CASCADE", baseTable); + conn.createStatement().execute(dropTableSQL); + // Run DropChildViewsTask to complete the tasks for dropping child views. The depth of the view tree is 2, + // so we expect that this will be done in two task handling runs as each non-root level will be processed + // in one run + + TaskRegionObserver.SelfHealingTask task = + new TaskRegionObserver.SelfHealingTask( + taskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); + for (int i = 0; i < numTaskRuns; i++) { + task.run(); + } + + assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS, + null, null, null, null, null); + + // Views should be dropped by now + TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES); + TableViewFinderResult childViewsResult = new TableViewFinderResult(); + ViewUtil.findAllRelatives(getUtility().getConnection().getTable(linkTable), + HConstants.EMPTY_BYTE_ARRAY, + SchemaUtil.getSchemaNameFromFullName(baseTable).getBytes(), + SchemaUtil.getTableNameFromFullName(baseTable).getBytes(), + PTable.LinkType.CHILD_TABLE, + childViewsResult); + assertEquals(0, childViewsResult.getLinks().size()); + } + + + } + + static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType, + String expectedTableName, String expectedTenantId, String expectedSchema, Timestamp expectedTs, + String expectedIndexName) + throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT * " + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " + + taskType.getSerializedValue()); + assertTrue(rs.next()); + String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS); + assertEquals(expectedStatus, taskStatus); + + if (expectedTableName != null) { + String tableName = rs.getString(PhoenixDatabaseMetaData.TABLE_NAME); + assertEquals(expectedTableName, tableName); + } + + if (expectedTenantId != null) { + String tenantId = rs.getString(PhoenixDatabaseMetaData.TENANT_ID); + assertEquals(expectedTenantId, tenantId); + } + + if (expectedSchema != null) { + String schema = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM); + assertEquals(expectedSchema, schema); + } + + if (expectedTs != null) { + Timestamp ts = rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS); + assertEquals(expectedTs, ts); + } + + if (expectedIndexName != null) { + String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA); + assertEquals(true, data.contains("\"IndexName\":\"" + expectedIndexName)); + } + } + + private List<String> getExplain(String query, Properties props) throws SQLException { + List<String> explainPlan = new ArrayList<>(); + try(Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement("EXPLAIN " + query); + ResultSet rs = statement.executeQuery()) { + while(rs.next()) { + String plan = rs.getString(1); + explainPlan.add(plan); + } + } + return explainPlan; + } + + + protected PhoenixTestBuilder.SchemaBuilder createLevel2TenantViewWithGlobalLevelTTL( + int globalTTL, + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions tenantViewOptions, + PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions tenantViewIndexOptions, + boolean allowIndex) throws Exception { + // Define the test schema. + // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + + PhoenixTestBuilder.SchemaBuilder.TableOptions + tableOptions = PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions + globalViewOptions = PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults(); + // View TTL is set to 300s => 300000 ms + globalViewOptions.setTableProps(String.format("TTL=%d", globalTTL)); + + PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions + = PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewWithOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults(); + if (tenantViewOptions != null) { + tenantViewWithOverrideOptions = tenantViewOptions; + } + PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions + tenantViewIndexOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults(); + if (tenantViewIndexOptions != null) { + tenantViewIndexOverrideOptions = tenantViewIndexOptions; + } + if (allowIndex) { + schemaBuilder.withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions) + .withTenantViewIndexOptions(tenantViewIndexOverrideOptions) + .buildWithNewTenant(); + } else { + schemaBuilder.withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions) + .buildWithNewTenant(); + } + return schemaBuilder; + } + + protected PhoenixTestBuilder.SchemaBuilder createLevel1TenantView( + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions tenantViewOptions, + PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions tenantViewIndexOptions) throws Exception { + // Define the test schema. + // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + + PhoenixTestBuilder.SchemaBuilder.TableOptions + tableOptions = PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults(); + if (tenantViewOptions != null) { + tenantViewOverrideOptions = tenantViewOptions; + } + PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions + tenantViewIndexOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults(); + if (tenantViewIndexOptions != null) { + tenantViewIndexOverrideOptions = tenantViewIndexOptions; + } + + schemaBuilder.withTableOptions(tableOptions) + .withTenantViewOptions(tenantViewOverrideOptions) + .withTenantViewIndexOptions(tenantViewIndexOverrideOptions).buildNewView(); + return schemaBuilder; + } + + @Test + public void testIndexesOfIndexTableLinkTypeAndIndexHdrCondition() throws Exception { + + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewOptions = new PhoenixTestBuilder.SchemaBuilder.TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS)); + tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES)); + + // Create 2 level view + final PhoenixTestBuilder.SchemaBuilder + schemaBuilder = createLevel2TenantViewWithGlobalLevelTTL(VIEW_TTL_300_SECS, tenantViewOptions, null, + true); + + String tenantId = schemaBuilder.getDataOptions().getTenantId(); + String fullBaseTableName = schemaBuilder.getEntityTableName(); + String schemaName = stripQuotes( + SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String globalViewName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName())); + String fullGlobalViewName = schemaBuilder.getEntityGlobalViewName(); + String tenantViewName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String globalIndexName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewIndexName())); + String tenantIndexName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewIndexName())); + + // Assert View Header rows exists for global view + assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, globalViewName, true, VIEW_TTL_300_SECS); + // Assert View Header rows exists for tenant view + assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, tenantViewName, true, 0); + + // Assert index table link rows (link_type = 1) exists in SYSTEM. CATALOG + assertSystemCatalogHasIndexTableLinks(null, schemaName, globalViewName); + assertSystemCatalogHasIndexTableLinks(tenantId, schemaName, tenantViewName); + + // Assert index table header rows (table_type = 'i' AND INDEX_STATE IS NOT NULL) exists in SYSTEM. CATALOG + assertSystemCatalogHasIndexHdr(null, schemaName, globalIndexName); + assertSystemCatalogHasIndexHdr(tenantId, schemaName, tenantIndexName); + + //Create the SYSTEM.CATALOG index for Index Table links + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute(SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL); + stmt.execute(SYS_INDEX_HDR_TEST_INDEX_SQL); + conn.commit(); + } + LOGGER.info("Finished creating index: " + SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL); + LOGGER.info("Finished creating index: " + SYS_INDEX_HDR_TEST_INDEX_SQL); + + // Assert System Catalog index table has been created + assertSystemCatalogIndexTable(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, true); + assertSystemCatalogIndexTable(SYS_INDEX_HDR_TEST_INDEX_NAME, true); + // Assert appropriate rows are inserted in the SYSTEM.CATALOG index tables + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, null, schemaName, globalViewName, + true, globalIndexName); + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, + true, tenantIndexName); + + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, schemaName, globalViewName, + true, globalIndexName, PIndexState.ACTIVE.getSerializedValue()); + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, + true, tenantIndexName, PIndexState.ACTIVE.getSerializedValue()); + + // Alter the index state and verify meta index + alterIndexState(globalIndexName, fullGlobalViewName, PIndexState.UNUSABLE); + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, schemaName, globalViewName, + true, globalIndexName, PIndexState.INACTIVE.getSerializedValue()); + + alterIndexState(globalIndexName, fullGlobalViewName, PIndexState.REBUILD); + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, schemaName, globalViewName, + true, globalIndexName, PIndexState.ACTIVE.getSerializedValue()); + + LOGGER.info("Dropping base table " + fullBaseTableName); + dropTableWithChildViews(fullBaseTableName, 2); + assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, globalViewName, + false, VIEW_TTL_300_SECS); + assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, tenantViewName, + false, 0); + + // Assert appropriate rows are dropped/deleted in the SYSTEM.CATALOG index tables + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, null, schemaName, globalViewName, false, null); + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, false, null); + + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, schemaName, null, + false, globalIndexName, null); + assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, tenantId, schemaName, null, + false, tenantIndexName, null); + + dropSystemCatalogIndex(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME); + dropSystemCatalogIndex(SYS_INDEX_HDR_TEST_INDEX_NAME); + + // Assert System Catalog index table dropped + assertSystemCatalogIndexTable(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, false); + assertSystemCatalogIndexTable(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, false); + } + + @Test + public void testIndexesOfViewAndIndexHeadersCondition() throws Exception { + + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults(); + // View TTL is set to 120s => 120000 ms + tenantViewOptions.setTableProps(String.format("TTL=%d", VIEW_TTL_120_SECS)); + + final PhoenixTestBuilder.SchemaBuilder + schemaBuilder = createLevel1TenantView(tenantViewOptions, null); + String tenantId = schemaBuilder.getDataOptions().getTenantId(); + String fullBaseTableName = schemaBuilder.getEntityTableName(); + String schemaName = stripQuotes( + SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String tenantViewName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String indexOnTenantViewName = String + .format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix())); + + // TABLE_TYPE = 'v' + // Expected 1 rows - one for TenantView. + // Since the TTL property values are being set, + // we expect the view header columns to show up in regular queries + assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, tenantViewName, + true, VIEW_TTL_120_SECS); + // Assert index header rows (link_type IS NULL AND TABLE_TYPE = 'i') exists in SYSTEM. CATALOG + assertSystemCatalogHasViewIndexHeaderRelatedColumns(tenantId, schemaName, indexOnTenantViewName,true); + + assertSystemCatalogHasRowKeyMatcherRelatedColumns(tenantId, schemaName, tenantViewName,true); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + //TestUtil.dumpTable(conn, TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)); + stmt.execute(SYS_VIEW_HDR_TEST_INDEX_SQL); + stmt.execute(SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL); + stmt.execute(SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL); + + conn.commit(); + } + LOGGER.info("Finished creating index: " + SYS_VIEW_HDR_TEST_INDEX_SQL); + LOGGER.info("Finished creating index: " + SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL); + LOGGER.info("Finished creating index: " + SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL); + + /** + * Testing creation of SYS_INDEX rows + */ + + // Assert System Catalog index table has been created + assertSystemCatalogIndexTable(SYS_VIEW_HDR_TEST_INDEX_NAME, true); + assertSystemCatalogIndexTable(SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, true); + assertSystemCatalogIndexTable(SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, true); + // Assert appropriate rows are inserted in the SYSTEM.CATALOG index tables + assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, true); + assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, true); + assertSystemCatalogIndexHaveViewIndexHeaders(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, tenantId, schemaName, indexOnTenantViewName, true); + + /** + * Testing explain plans + */ + + List<String> plans = getExplain("select TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, TABLE_TYPE FROM SYSTEM.CATALOG WHERE TABLE_TYPE = 'v' ", new Properties()); + assertEquals(String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", FULL_SYS_VIEW_HDR_TEST_INDEX_NAME), plans.get(0)); + + plans = getExplain("select VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE FROM SYSTEM.CATALOG WHERE TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND VIEW_INDEX_ID IS NOT NULL", new Properties()); + assertEquals(String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME), plans.get(0)); + + plans = getExplain("select ROW_KEY_MATCHER, TTL, TABLE_NAME FROM SYSTEM.CATALOG WHERE TABLE_TYPE = 'v' AND ROW_KEY_MATCHER IS NOT NULL", new Properties()); + assertEquals(String.format("CLIENT PARALLEL 1-WAY RANGE SCAN OVER %s [not null]", FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME), plans.get(0)); + + /** + * Testing cleanup of SYS_INDEX rows after dropping tables and views + */ + LOGGER.info("Dropping base table " + fullBaseTableName); + dropTableWithChildViews(fullBaseTableName, 2); + // Assert view header rows (link_type IS NULL AND TABLE_TYPE = 'v') does not exist in SYSTEM.CATALOG + assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, tenantViewName, + false, VIEW_TTL_120_SECS); + // Assert view header rows (ROW_KEY_MATCHER IS NOT NULL does not exist in SYSTEM.CATALOG + assertSystemCatalogHasRowKeyMatcherRelatedColumns(tenantId, schemaName, tenantViewName,false); + // Assert index header rows (link_type IS NULL AND TABLE_TYPE = 'i') does not exists in SYSTEM.CATALOG + assertSystemCatalogHasViewIndexHeaderRelatedColumns(tenantId, schemaName, tenantViewName,false); + + // Assert appropriate rows are dropped/deleted in the SYSTEM.CATALOG index tables + assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, false); + assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, false); + assertSystemCatalogIndexHaveViewIndexHeaders(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, false); + + dropSystemCatalogIndex(SYS_VIEW_HDR_TEST_INDEX_NAME); + dropSystemCatalogIndex(SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME); + dropSystemCatalogIndex(SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME); + + // Assert System Catalog index table dropped + assertSystemCatalogIndexTable(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, false); + assertSystemCatalogIndexTable(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, false); + assertSystemCatalogIndexTable(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, false); + } + + @Test + public void testIndexesOnOtherSystemTables() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + try { + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_LINK_4_IDX ON SYSTEM.CHILD_LINK(TENANT_ID, TABLE_SCHEM, TABLE_NAME, LINK_TYPE) WHERE LINK_TYPE = 4"); + fail(); + } catch (SQLException sqle) { + Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), sqle.getErrorCode()); + } + try { + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_STATS_IDX ON SYSTEM.STATS(PHYSICAL_NAME, COLUMN_FAMILY, GUIDE_POST_WIDTH, GUIDE_POSTS_ROW_COUNT) WHERE COLUMN_FAMILY = '4'"); + fail(); + } catch (SQLException sqle) { + Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), sqle.getErrorCode()); + } + try { + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_LOG_IDX ON SYSTEM.LOG(USER, CLIENT_IP, QUERY) WHERE QUERY_ID = '4'"); + fail(); + } catch (SQLException sqle) { + Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), sqle.getErrorCode()); + } + + try { + stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_FUNCTION_IDX ON SYSTEM.FUNCTION(CLASS_NAME,JAR_PATH) WHERE FUNCTION_NAME = '4'"); + fail(); + } catch (SQLException sqle) { + Assert.assertEquals(MISMATCHED_TOKEN.getErrorCode(), sqle.getErrorCode()); + } + + } + } + + @Test + @Ignore + // TODO: needs to make this test more robust after fixing the deadlock + public void testAddColumnWithCascadeOnMetaIndexes() throws Exception { + + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewOptions = new PhoenixTestBuilder.SchemaBuilder.TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS)); + tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES)); + + // Create 2 level view + final PhoenixTestBuilder.SchemaBuilder + schemaBuilder = createLevel2TenantViewWithGlobalLevelTTL(VIEW_TTL_300_SECS, tenantViewOptions, null, + true); + + String tenantId = schemaBuilder.getDataOptions().getTenantId(); + String fullBaseTableName = schemaBuilder.getEntityTableName(); + String schemaName = stripQuotes( + SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String globalViewName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName())); + String fullGlobalViewName = schemaBuilder.getEntityGlobalViewName(); + String tenantViewName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String globalIndexName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewIndexName())); + String tenantIndexName = stripQuotes( + SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewIndexName())); + + // Assert View Header rows exists for global view + assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, globalViewName, true, VIEW_TTL_300_SECS); + // Assert View Header rows exists for tenant view + assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, tenantViewName, true, 0); + + // Assert index table link rows (link_type = 1) exists in SYSTEM. CATALOG + assertSystemCatalogHasIndexTableLinks(null, schemaName, globalViewName); + assertSystemCatalogHasIndexTableLinks(tenantId, schemaName, tenantViewName); + + // Assert index table header rows (table_type = 'i' AND INDEX_STATE IS NOT NULL) exists in SYSTEM. CATALOG + assertSystemCatalogHasIndexHdr(null, schemaName, globalIndexName); + assertSystemCatalogHasIndexHdr(tenantId, schemaName, tenantIndexName); + + ////////////////////////////////// + // TODO : fix the deadlock issue when partial index are present + //Create the SYSTEM.CATALOG index for Index Table links + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute(SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL); + stmt.execute(SYS_INDEX_HDR_TEST_INDEX_SQL); + conn.commit(); + } + LOGGER.info("Finished creating index: " + SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL); + LOGGER.info("Finished creating index: " + SYS_INDEX_HDR_TEST_INDEX_SQL); + /////////////////////////////////// + + // Assert System Catalog index table has been created + assertSystemCatalogIndexTable(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, true); + assertSystemCatalogIndexTable(SYS_INDEX_HDR_TEST_INDEX_NAME, true); + // Assert appropriate rows are inserted in the SYSTEM.CATALOG index tables + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, null, schemaName, globalViewName, + true, globalIndexName); + assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, tenantId, schemaName, tenantViewName, + true, tenantIndexName); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute("ALTER TABLE SYSTEM.CATALOG ADD IF NOT EXISTS TEST_COL1 INTEGER DEFAULT 5 CASCADE INDEX ALL"); + } + + // Assert the new column exists in the meta index definition + assertAdditionalColumnInMetaIndexTable("SYSTEM", SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, "0:TEST_COL1", true); + assertAdditionalColumnInMetaIndexTable("SYSTEM", SYS_INDEX_HDR_TEST_INDEX_NAME, "0:TEST_COL1", true); + + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + stmt.execute("ALTER TABLE SYSTEM.CATALOG DROP COLUMN TEST_COL1"); + } + + // Assert the new column has been removed from the meta index definition + assertAdditionalColumnInMetaIndexTable("SYSTEM", SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, "0:TEST_COL1", false); + assertAdditionalColumnInMetaIndexTable("SYSTEM", SYS_INDEX_HDR_TEST_INDEX_NAME, "0:TEST_COL1", false); + + dropSystemCatalogIndex(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME); + dropSystemCatalogIndex(SYS_INDEX_HDR_TEST_INDEX_NAME); + + // Assert System Catalog index table dropped + assertSystemCatalogIndexTable(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, false); + assertSystemCatalogIndexTable(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, false); + + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java new file mode 100644 index 0000000000..4d8cd10a9d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ + +package org.apache.phoenix.hbase.index.write; + +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.compat.hbase.CompatUtil; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +public class TestTrackingParallelWriterIndexCommitter extends TrackingParallelWriterIndexCommitter { + private static final Logger LOGGER = + LoggerFactory.getLogger(TestTrackingParallelWriterIndexCommitter.class); + + private HTableFactory testFactory; + + public static class TestConnectionFactory extends ServerUtil.ConnectionFactory { + + private static Map<ServerUtil.ConnectionType, Connection> connections = + new ConcurrentHashMap<ServerUtil.ConnectionType, Connection>(); + + public static Connection getConnection(final ServerUtil.ConnectionType connectionType, final RegionCoprocessorEnvironment env) { + final String key = String.format("%s-%s", env.getServerName(), connectionType.name().toLowerCase()); + LOGGER.info("Connecting to {}", key); + return connections.computeIfAbsent(connectionType, new Function<ServerUtil.ConnectionType, Connection>() { + @Override + public Connection apply(ServerUtil.ConnectionType t) { + try { + return CompatUtil.createShortCircuitConnection(getTypeSpecificConfiguration(connectionType, env.getConfiguration()), env); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + + public static void shutdown() { + synchronized (ServerUtil.ConnectionFactory.class) { + for (Connection connection : connections.values()) { + try { + LOGGER.info("Closing connection to {}", connection.getClusterId()); + connection.close(); + } catch (IOException e) { + LOGGER.warn("Unable to close coprocessor connection", e); + } + } + connections.clear(); + } + } + + public static int getConnectionsCount() { + return connections.size(); + } + + } + + + public static class TestCoprocessorHConnectionTableFactory extends IndexWriterUtils.CoprocessorHConnectionTableFactory { + + @GuardedBy("TestCoprocessorHConnectionTableFactory.this") + private RegionCoprocessorEnvironment env; + private ServerUtil.ConnectionType connectionType; + + TestCoprocessorHConnectionTableFactory(RegionCoprocessorEnvironment env, + ServerUtil.ConnectionType connectionType) { + super(env, connectionType); + this.env = env; + this.connectionType = connectionType; + } + + @Override + public Connection getConnection() throws IOException { + return TestConnectionFactory.getConnection(connectionType, env); + } + + @Override + public synchronized void shutdown() { + TestConnectionFactory.shutdown(); + } + } + + @Override + void setup(HTableFactory factory, ExecutorService pool, Stoppable stop, + RegionCoprocessorEnvironment env) { + LOGGER.info("Setting up TestCoprocessorHConnectionTableFactory "); + testFactory = + new TestCoprocessorHConnectionTableFactory(env, + ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS); + super.setup(testFactory, pool, stop, env); + + } + + @Override + public void stop(String why) { + LOGGER.info("Stopping TestTrackingParallelWriterIndexCommitter " + why); + testFactory.shutdown(); + super.stop(why); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java index a45f5d4497..691adb9775 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java @@ -20,6 +20,7 @@ package org.apache.phoenix.jdbc; import org.apache.hadoop.hbase.*; import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl; import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl; +import org.apache.phoenix.hbase.index.write.TestTrackingParallelWriterIndexCommitter; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; import org.apache.commons.lang3.RandomUtils; @@ -55,6 +56,7 @@ import static org.apache.hadoop.hbase.ipc.RpcClient.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.test.GenericTestUtils.waitFor; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; +import static org.apache.phoenix.hbase.index.write.IndexWriter.INDEX_COMMITTER_CONF_KEY; import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; import static org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR; import static org.apache.phoenix.jdbc.HighAvailabilityGroup.*; @@ -702,6 +704,9 @@ public class HighAvailabilityTestingUtility { // Phoenix Region Server Endpoint needed for metadata caching conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY, PhoenixRegionServerEndpointTestImpl.class.getName()); + conf.set(INDEX_COMMITTER_CONF_KEY, + TestTrackingParallelWriterIndexCommitter.class.getName()); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java index 368a9c52a4..b5f6d67a80 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java @@ -23,11 +23,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import org.apache.phoenix.end2end.ConnectionQueryServicesTestImpl; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.ConnectionlessQueryServicesImpl; import org.apache.phoenix.query.QueryServices; @@ -35,7 +39,7 @@ import org.apache.phoenix.query.QueryServicesTestImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; - +import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull; /** * @@ -50,16 +54,15 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { private final ReadOnlyProps overrideProps; - @GuardedBy("this") private final QueryServices queryServices; - @GuardedBy("this") - private boolean closed = false; - - @GuardedBy("this") private final Map<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesMap = new HashMap<>(); + @GuardedBy("closeLock") + private volatile boolean closed = false; + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + public PhoenixTestDriver() { this(ReadOnlyProps.EMPTY_PROPS); } @@ -71,9 +74,14 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { } @Override - public synchronized QueryServices getQueryServices() { - checkClosed(); - return queryServices; + public QueryServices getQueryServices() throws SQLException { + lockInterruptibly(PhoenixTestDriver.LockMode.READ); + try { + checkClosed(); + return queryServices; + } finally { + unlock(PhoenixTestDriver.LockMode.READ); + } } @Override @@ -83,43 +91,60 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { } @Override - public synchronized Connection connect(String url, Properties info) throws SQLException { - checkClosed(); - return super.connect(url, info); + public Connection connect(String url, Properties info) throws SQLException { + lockInterruptibly(PhoenixTestDriver.LockMode.READ); + try { + checkClosed(); + return super.connect(url, info); + } finally { + unlock(PhoenixTestDriver.LockMode.READ); + } } @Override // public for testing - public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties infoIn) throws SQLException { - checkClosed(); - final Properties info = PropertiesUtil.deepCopy(infoIn); - ConnectionInfo connInfo = ConnectionInfo.create(url, null, info); - ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(connInfo); - if (connectionQueryServices != null) { + public ConnectionQueryServices getConnectionQueryServices(String url, Properties infoIn) throws SQLException { + lockInterruptibly(PhoenixTestDriver.LockMode.READ); + try { + checkClosed(); + final Properties info = PropertiesUtil.deepCopy(infoIn); + ConnectionInfo connInfo = ConnectionInfo.create(url, null, info); + ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(connInfo); + if (connectionQueryServices != null) { + return connectionQueryServices; + } + info.putAll(connInfo.asProps().asMap()); + if (connInfo.isConnectionless()) { + connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info); + } else { + connectionQueryServices = new ConnectionQueryServicesTestImpl(queryServices, connInfo, info); + } + connectionQueryServices.init(url, info); + connectionQueryServicesMap.put(connInfo, connectionQueryServices); return connectionQueryServices; + } finally { + unlock(PhoenixTestDriver.LockMode.READ); } - info.putAll(connInfo.asProps().asMap()); - if (connInfo.isConnectionless()) { - connectionQueryServices = new ConnectionlessQueryServicesImpl(queryServices, connInfo, info); - } else { - connectionQueryServices = new ConnectionQueryServicesTestImpl(queryServices, connInfo, info); - } - connectionQueryServices.init(url, info); - connectionQueryServicesMap.put(connInfo, connectionQueryServices); - return connectionQueryServices; } - - private synchronized void checkClosed() { + + @GuardedBy("closeLock") + private void checkClosed() { if (closed) { throw new IllegalStateException("The Phoenix jdbc test driver has been closed."); } } @Override - public synchronized void close() throws SQLException { - if (closed) { - return; + public void close() throws SQLException { + lockInterruptibly(PhoenixTestDriver.LockMode.WRITE); + + try { + if (closed) { + return; + } + closed = true; + } finally { + unlock(PhoenixTestDriver.LockMode.WRITE); } - closed = true; try { for (ConnectionQueryServices cqs : connectionQueryServicesMap.values()) { cqs.close(); @@ -134,4 +159,41 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { } } } + private enum LockMode { + READ, WRITE + }; + + private void lockInterruptibly(PhoenixTestDriver.LockMode mode) throws SQLException { + checkNotNull(mode); + switch (mode) { + case READ: + try { + closeLock.readLock().lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) + .setRootCause(e).build().buildException(); + } + break; + case WRITE: + try { + closeLock.writeLock().lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) + .setRootCause(e).build().buildException(); + } + } + } + + private void unlock(PhoenixTestDriver.LockMode mode) { + checkNotNull(mode); + switch (mode) { + case READ: + closeLock.readLock().unlock(); + break; + case WRITE: + closeLock.writeLock().unlock(); + } + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java new file mode 100644 index 0000000000..e661acad6e --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DecodeViewIndexIdFunctionTest { + + @Test + public void testExpressionWithDecodeViewIndexIdFunction() throws Exception { + ParseNode parseNode = SQLParser.parseCondition("DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE) = 32768"); + boolean hasGetViewIndexIdParseNode = false; + for (ParseNode childNode : parseNode.getChildren()) { + if (childNode.getClass().isAssignableFrom(DecodeViewIndexIdParseNode.class)) { + assertEquals(2, childNode.getChildren().size()); + hasGetViewIndexIdParseNode = true; + } + } + assertTrue(hasGetViewIndexIdParseNode); + } + + @Test + public void testValidationForDecodeViewIndexIdFunction() throws Exception { + boolean hasGetViewIndexIdParseNode = false; + try { + ParseNode parseNode = SQLParser.parseCondition("DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, b) = 32768"); + for (ParseNode childNode : parseNode.getChildren()) { + if (childNode.getClass().isAssignableFrom(DecodeViewIndexIdParseNode.class)) { + assertEquals(2, childNode.getChildren().size()); + hasGetViewIndexIdParseNode = true; + } + } + } catch (Exception e) { + hasGetViewIndexIdParseNode = false; + + } + assertFalse(hasGetViewIndexIdParseNode); + } + +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 7d5ebbb849..f9e68fefb4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -152,6 +152,7 @@ import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; @@ -655,6 +656,9 @@ public abstract class BaseTest { conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1); conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0); + conf.set(IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY, + "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec"); + // This results in processing one row at a time in each next operation of the aggregate region // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed // within one page; 0ms page is equivalent to one-row page