This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit a5ea1189424913af098a5bbd55cdee035f46f9ba
Author: Jacob Isaac <jacobpisaa...@gmail.com>
AuthorDate: Thu Apr 24 17:12:46 2025 -0700

    PHOENIX-7107 Add support for indexing on SYSTEM.CATALOG table (#2048)
---
 .../phoenix/compile/CreateIndexCompiler.java       |  38 +
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +
 .../apache/phoenix/expression/ExpressionType.java  |   3 +-
 .../function/DecodeViewIndexIdFunction.java        | 183 ++++
 .../apache/phoenix/parse/CreateTableStatement.java |  10 +-
 .../phoenix/parse/DecodeViewIndexIdParseNode.java  |  79 ++
 .../phoenix/query/ConnectionQueryServicesImpl.java |  26 +-
 .../org/apache/phoenix/query/QueryServices.java    |   1 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |   4 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  | 166 +++-
 .../phoenix/coprocessor/TaskRegionObserver.java    |   2 +
 .../phoenix/end2end/BaseRowKeyMatcherTestIT.java   |  59 +-
 .../end2end/index/PartialSystemCatalogIndexIT.java | 988 +++++++++++++++++++++
 .../TestTrackingParallelWriterIndexCommitter.java  | 117 +++
 .../jdbc/HighAvailabilityTestingUtility.java       |   5 +
 .../org/apache/phoenix/jdbc/PhoenixTestDriver.java | 128 ++-
 .../parse/DecodeViewIndexIdFunctionTest.java       |  59 ++
 .../java/org/apache/phoenix/query/BaseTest.java    |   4 +
 19 files changed, 1813 insertions(+), 65 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index c98f9a6a5c..bcdae4013d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.compile;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.client.Scan;
@@ -32,6 +33,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.SubqueryParseNode;
@@ -73,6 +75,10 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import static 
org.apache.phoenix.query.QueryServices.DEFAULT_SYSTEM_KEEP_DELETED_CELLS_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED;
+
 public class CreateIndexCompiler {
     private final PhoenixStatement statement;
     private final Operation operation;
@@ -230,6 +236,7 @@ public class CreateIndexCompiler {
     }
     public MutationPlan compile(final CreateIndexStatement create) throws 
SQLException {
         final PhoenixConnection connection = statement.getConnection();
+        verifyDataTable(connection, create.getTable());
         final ColumnResolver resolver
                 = FromCompiler.getResolverForCreateIndex(
                         create, connection, create.getUdfParseNodes());
@@ -278,4 +285,35 @@ public class CreateIndexCompiler {
                
         };
     }
+
+    /**
+     * Helper method to validate CREATE INDEX statements on SYSTEM tables.
+     * 1. Pass if scheme name not provided, assumption is - it not a SYSTEM 
table.
+     * 2. Fail if SYSTEM_CATALOG_INDEXES_ENABLED not enabled
+     * 3. Fail if table other than SYSTEM.CATALOG
+     *
+     * @param connection
+     * @param table
+     * @throws SQLException
+     */
+    private void verifyDataTable(PhoenixConnection connection, NamedTableNode 
table) throws SQLException {
+        Configuration conf = connection.getQueryServices().getConfiguration();
+        boolean catalogIndexesEnabled = 
conf.getBoolean(SYSTEM_CATALOG_INDEXES_ENABLED, 
DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED);
+
+        TableName tableName = table.getName();
+        if (tableName.getSchemaName() == null) {
+            return;
+        }
+        if 
(tableName.getSchemaName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME)
 &&
+                !catalogIndexesEnabled) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SYSTEM_TABLE_INDEXES_NOT_ENABLED).
+                    build().buildException();
+        }
+
+        if 
(tableName.getSchemaName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME)
 &&
+            
!tableName.getTableName().equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE))
 {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_INDEX_SYSTEM_TABLE).
+                    build().buildException();
+        }
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index a2559c2f2f..f1fdbce009 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -113,6 +113,10 @@ public enum SQLExceptionCode {
             " Index where clause cannot include a subquery."),
     CANNOT_EVALUATE_INDEX_WHERE(304, "23102",
             "Invalid index where clause. It cannot be evaluated on a data 
table row."),
+    SYSTEM_TABLE_INDEXES_NOT_ENABLED(305, "23103",
+            "Invalid index on table. Indexes on SYSTEM tables are not 
enabled."),
+    CANNOT_INDEX_SYSTEM_TABLE(306, "23104",
+            "Invalid index on table. SYSTEM Indexes can only be on 
SYSTEM.CATALOG table."),
     /**
      * Invalid Cursor State (errorcode 04, sqlstate 24)
      */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 1fb68d81ae..453925ef5d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -202,7 +202,8 @@ public enum ExpressionType {
     BsonValueFunction(BsonValueFunction.class),
     PartitionIdFunction(PartitionIdFunction.class),
     DecodeBinaryFunction(DecodeBinaryFunction.class),
-    EncodeBinaryFunction(EncodeBinaryFunction.class);
+    EncodeBinaryFunction(EncodeBinaryFunction.class),
+    DecodeViewIdFunction(DecodeViewIndexIdFunction.class);
 
     ExpressionType(Class<? extends Expression> clazz) {
         this.clazz = clazz;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java
new file mode 100644
index 0000000000..2f531b1cc1
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/DecodeViewIndexIdFunction.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.DecodeViewIndexIdParseNode;
+import org.apache.phoenix.parse.PhoenixRowTimestampParseNode;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.util.ByteUtil;
+
+import java.sql.Types;
+import java.util.List;
+
+import static 
org.apache.phoenix.util.ViewIndexIdRetrieveUtil.NULL_DATA_TYPE_VALUE;
+import static 
org.apache.phoenix.util.ViewIndexIdRetrieveUtil.VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN;
+
+/**
+ * Function to return the ViewIndexId value based on the ViewIndexIDDataType 
field.
+ * Can also be used in sql predicates.
+ * THe ViewIndexId field value needs to be interpreted based on the type 
specified in the
+ * ViewIndexIdDataType field
+ This is how the various client created view index id's look like:
+ client                  VIEW_INDEX_ID(Cell number of bytes)     
VIEW_INDEX_ID_DATA_TYPE
+ pre-4.15                        2 bytes                                     
NULL
+ post-4.15[config smallint]      2 bytes                                     
5(smallint)
+ post-4.15[config bigint]        8 bytes                                     
-5(bigint)
+
+ VIEW_INDEX_ID_DATA_TYPE,      VIEW_INDEX_ID(Cell representation of the data)
+ NULL,                         SMALLINT         -> RETRIEVE AND CONVERT TO 
BIGINT
+ SMALLINT,                     SMALLINT         -> RETRIEVE AND CONVERT TO 
BIGINT
+ BIGINT,                       BIGINT           -> DO NOT CONVERT
+
+ */
+@BuiltInFunction(name = DecodeViewIndexIdFunction.NAME,
+        nodeClass= DecodeViewIndexIdParseNode.class,
+        args = {@FunctionParseNode.Argument(allowedTypes = { PLong.class}),
+                @FunctionParseNode.Argument(allowedTypes = { PInteger.class})
+        })
+public class DecodeViewIndexIdFunction extends ScalarFunction {
+
+    public static final String NAME = "DECODE_VIEW_INDEX_ID";
+
+    public DecodeViewIndexIdFunction() {
+    }
+
+    /**
+     *  @param children VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE expressions
+     */
+    public DecodeViewIndexIdFunction(List<Expression> children) {
+        super(children);
+
+        // It takes 2 parameters - VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE.
+        if ((children.size() != 2) || 
!children.get(0).getClass().isAssignableFrom(
+                KeyValueColumnExpression.class) || 
!children.get(1).getClass().isAssignableFrom(
+                KeyValueColumnExpression.class)) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdFunction should only have a "
+                            + "VIEW_INDEX_ID and a VIEW_INDEX_ID_DATA_TYPE key 
value expression."
+            );
+        }
+        if (!(children.get(0).getDataType().equals(PLong.INSTANCE))) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdFunction should have an "
+                            + "VIEW_INDEX_ID key value expression of type 
PLong"
+            );
+        }
+
+        if (!(children.get(1).getDataType().equals(PInteger.INSTANCE))) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdFunction should have an "
+                            + "VIEW_INDEX_ID_DATA_TYPE key value expression of 
type PLong"
+            );
+        }
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (tuple == null) {
+            return false;
+        }
+
+        byte[] viewIndexIdCF = ((KeyValueColumnExpression) 
children.get(0)).getColumnFamily();
+        byte[] viewIndexIdCQ = ((KeyValueColumnExpression) 
children.get(0)).getColumnQualifier();
+        byte[] viewIndexIdTypeCF = ((KeyValueColumnExpression) 
children.get(1)).getColumnFamily();
+        byte[] viewIndexIdTypeCQ = ((KeyValueColumnExpression) 
children.get(1)).getColumnQualifier();
+
+        Cell viewIndexIdCell = tuple.getValue(viewIndexIdCF, viewIndexIdCQ);
+        Cell viewIndexIdDataTypeCell = tuple.getValue(viewIndexIdTypeCF, 
viewIndexIdTypeCQ);
+        if ((viewIndexIdCell != null) && (viewIndexIdCell.getValueLength() == 
0)) {
+            ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            return true;
+        }
+
+
+        /*
+        This is combination of diff client created view index looks like:
+            client                  VIEW_INDEX_ID(Cell number of bytes)     
VIEW_INDEX_ID_DATA_TYPE
+        pre-4.15                        2 bytes                                
     NULL
+        post-4.15[config smallint]      2 bytes                                
     5(smallint)
+        post-4.15[config bigint]        8 bytes                                
     -5(bigint)
+
+        VIEW_INDEX_ID_DATA_TYPE,      VIEW_INDEX_ID(Cell representation of the 
data)
+            NULL,                         SMALLINT         -> RETRIEVE AND 
CONVERT TO BIGINT
+            SMALLINT,                     SMALLINT         -> RETRIEVE AND 
CONVERT TO BIGINT
+            BIGINT,                       BIGINT           -> DO NOT CONVERT
+
+         */
+
+        if (viewIndexIdCell != null) {
+            int type = NULL_DATA_TYPE_VALUE;
+            if (viewIndexIdDataTypeCell != null) {
+                Object typeObject = PInteger.INSTANCE.toObject(
+                        viewIndexIdDataTypeCell.getValueArray(),
+                        viewIndexIdDataTypeCell.getValueOffset(),
+                        viewIndexIdDataTypeCell.getValueLength(),
+                        PInteger.INSTANCE,
+                        SortOrder.ASC);
+                if (typeObject != null) {
+                    type = (Integer) typeObject;
+                }
+            }
+
+            ImmutableBytesWritable columnValue =
+                    new 
ImmutableBytesWritable(CellUtil.cloneValue(viewIndexIdCell));
+            if ((type == NULL_DATA_TYPE_VALUE || type == Types.SMALLINT) && 
(viewIndexIdCell.getValueLength() <
+                    VIEW_INDEX_ID_BIGINT_TYPE_PTR_LEN)) {
+                byte[] newBytes = 
PLong.INSTANCE.toBytes(PSmallint.INSTANCE.toObject(columnValue.get()));
+                ptr.set(newBytes, 0, newBytes.length);
+            } else {
+                ptr.set(columnValue.get(), columnValue.getOffset(), 
columnValue.getLength());
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PLong.INSTANCE;
+    }
+
+    @Override
+    public boolean isStateless() {
+        return false;
+    }
+
+    @Override
+    public Determinism getDeterminism() {
+        return Determinism.PER_ROW;
+    }
+
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
index 37376c985e..d7f897a53b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
@@ -100,7 +100,15 @@ public class CreateTableStatement extends MutableStatement 
{
                                    Map<String, Integer> familyCounters, 
boolean noVerify) {
         this.tableName = tableName;
         this.props = props == null ? 
ImmutableListMultimap.<String,Pair<String,Object>>of() : props;
-        this.tableType = 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(tableName.getSchemaName()) 
? PTableType.SYSTEM : tableType;
+        // When it is an index on SYSTEM.CATALOG tableType => PTableType.INDEX
+        // If Schema is SYSTEM and tableType = SYSTEM | TABLE => 
PTableType.SYSTEM
+        // else the passed in tableType
+        this.tableType =
+                (PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(
+                        tableName.getSchemaName()) &&
+                        (tableType == PTableType.TABLE || tableType == 
PTableType.SYSTEM) ?
+                        PTableType.SYSTEM :
+                        tableType);
         this.columns = columns == null ? ImmutableList.<ColumnDef>of() : 
ImmutableList.<ColumnDef>copyOf(columns);
         this.pkConstraint = pkConstraint == null ? PrimaryKeyConstraint.EMPTY 
: pkConstraint;
         this.splitNodes = splitNodes == null ? 
Collections.<ParseNode>emptyList() : ImmutableList.copyOf(splitNodes);
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java
new file mode 100644
index 0000000000..6b299de9aa
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DecodeViewIndexIdParseNode.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.DecodeViewIndexIdFunction;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.IndexUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
+
+public class DecodeViewIndexIdParseNode extends FunctionParseNode {
+
+    DecodeViewIndexIdParseNode(String name, List<ParseNode> children,
+                                 BuiltInFunctionInfo info) {
+        super(name, children, info);
+        // It takes 2 parameters - VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE.
+        if (children.size() != 2) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdParseNode should only have "
+                            + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse 
nodes."
+            );
+        }
+        if (children.get(0).getClass().isAssignableFrom(ColumnParseNode.class)
+                && 
children.get(1).getClass().isAssignableFrom(ColumnParseNode.class)
+                && (!(((ColumnParseNode) 
children.get(0)).getName().equals(VIEW_INDEX_ID))
+                || !(((ColumnParseNode) 
children.get(1)).getName().equals(VIEW_INDEX_ID_DATA_TYPE)))
+        ) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdParseNode should only have "
+                            + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse 
nodes."
+            );
+        }
+
+        // CastPastNode is generated during IndexStatement rewriting
+        if (children.get(0).getClass().isAssignableFrom(CastParseNode.class)
+                && 
children.get(1).getClass().isAssignableFrom(CastParseNode.class)
+                && (!((ColumnParseNode) (((CastParseNode) 
children.get(0)).getChildren().get(0))).getName().equals(
+                
IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, 
VIEW_INDEX_ID))
+                || !((ColumnParseNode) (((CastParseNode) 
children.get(1)).getChildren().get(0))).getName().equals(
+                
IndexUtil.getIndexColumnName(QueryConstants.DEFAULT_COLUMN_FAMILY, 
VIEW_INDEX_ID_DATA_TYPE)))
+        ) {
+            throw new IllegalArgumentException(
+                    "DecodeViewIndexIdParseNode should only have "
+                            + "VIEW_INDEX_ID and VIEW_INDEX_ID_DATA_TYPE parse 
nodes."
+            );
+        }
+
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, 
StatementContext context)
+            throws SQLException {
+        return new DecodeViewIndexIdFunction(children);
+    }
+
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1f66ef11fa..dbd88bde2d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -85,6 +85,7 @@ import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TIMEOUT_DURING_UPGRADE_MS;
 import static org.apache.phoenix.util.UpgradeUtil.addParentToChildLinks;
 import static org.apache.phoenix.util.UpgradeUtil.addViewIndexToParentLinks;
@@ -1352,10 +1353,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes.
-            // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table 
because we use
+            // Also don't install on the SYSTEM.STATS table because we use
             // all-or-none mutate class which break when this coprocessor is 
installed (PHOENIX-1318).
+            // With PHOENIX-7107 which introduced indexes on SYSTEM.CATALOG we 
need to install the
+            // indexing coprocessor on SYSTEM.CATALOG
             if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW 
&& !isViewIndex)
-                    && !SchemaUtil.isMetaTable(tableName)
                     && !SchemaUtil.isStatsTable(tableName)) {
                 if (isTransactional) {
                     if 
(!newDesc.hasCoprocessor(QueryConstants.PHOENIX_TRANSACTIONAL_INDEXER_CLASSNAME))
 {
@@ -1785,8 +1787,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             TableDescriptorBuilder newDesc = 
generateTableDescriptor(physicalTableName, parentPhysicalTableName, 
existingDesc, tableType, props, families,
                     splits, isNamespaceMapped);
 
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("ensureTableCreated " +
+                                "physicalTableName = {}, " +
+                                "parentPhysicalTableName = {}, " +
+                                "isUpgradeRequired = {}, " +
+                                "isAutoUpgradeEnabled = {}, " +
+                                "isDoNotUpgradePropSet = {}, " +
+                                "isNamespaceMapped = {}, " +
+                                "createdNamespace = {}",
+                        Bytes.toString(physicalTableName),
+                        Bytes.toString(parentPhysicalTableName),
+                        isUpgradeRequired(),
+                        isAutoUpgradeEnabled,
+                        isDoNotUpgradePropSet,
+                        isNamespaceMapped,
+                        createdNamespace);
+            }
+
             if (!tableExist) {
-                if (SchemaUtil.isSystemTable(physicalTableName) && 
!isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) {
+                if (SchemaUtil.isSystemTable(physicalTableName) && (tableType 
== PTableType.TABLE || tableType == PTableType.SYSTEM) && !isUpgradeRequired() 
&& (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) {
                     // Disallow creating the SYSTEM.CATALOG or SYSTEM:CATALOG 
HBase table
                     throw new UpgradeRequiredException();
                 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index be3801d118..bc9320723c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -431,6 +431,7 @@ public interface QueryServices extends SQLCloseable {
     // As opposed to a copy and async (out of band) delete.
     public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = 
"phoenix.move.child_link.during.upgrade";
 
+    String SYSTEM_CATALOG_INDEXES_ENABLED = 
"phoenix.system.catalog.indexes.enabled";
     /**
      * Parameter to indicate the source of operation attribute.
      * It can include metadata about the customer, service, etc.
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9755572566..3bd94e4ffa 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -450,6 +450,8 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_PHOENIX_GET_METADATA_READ_LOCK_ENABLED 
= true;
 
     public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT 
= 300000; // 5 minutes
+    public static final boolean DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED = false;
+
 
     public static final Boolean 
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b07b0228ab..d04f01bf3b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1824,9 +1824,11 @@ public class MetaDataClient {
             }
 
             Configuration config = 
connection.getQueryServices().getConfiguration();
+            // Do descendant view validation only when enabled and not a 
SYSTEM table/index
             if (!connection.getQueryServices().getProps()
                 .getBoolean(DISABLE_VIEW_SUBTREE_VALIDATION,
-                    DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION)) {
+                    DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION) &&
+                    
!QueryConstants.SYSTEM_SCHEMA_NAME.equals(dataTable.getSchemaName().getString()))
 {
                 verifyIfDescendentViewsExtendPk(dataTable, config);
             }
             // for view indexes
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 11fdb8dbd3..4a16b1f32c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -83,8 +83,11 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static 
org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.SKIP_SYSTEM_TABLES_EXISTENCE_CHECK;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE_BYTES;
+import static 
org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED;
 import static org.apache.phoenix.schema.PTable.LinkType.PHYSICAL_TABLE;
 import static 
org.apache.phoenix.schema.PTable.LinkType.VIEW_INDEX_PARENT_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES;
@@ -203,6 +206,7 @@ import 
org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -272,6 +276,7 @@ import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -608,6 +613,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
     }
 
     private static boolean failConcurrentMutateAddColumnOneTimeForTesting = 
false;
+    private static final String FORCE_INDEX_MUTATE_METADATA_AS_ATTRIB = 
String.valueOf(Integer.MAX_VALUE);
     private RegionCoprocessorEnvironment env;
 
     private PhoenixMetaDataCoprocessorHost phoenixAccessCoprocessorHost;
@@ -1671,7 +1677,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         isSalted, baseColumnCount, isRegularView, 
columnTimestamp);
             }
         }
-        if (tableType == INDEX && ! isThisAViewIndex) {
+        // Ignoring meta indexes when looking for maxLookBackAge and TTL on 
parent tables.
+        // Due to failures in Namespace related ITs when 
isNamespaceMappingEnabled is enabled.
+        boolean isMetaIndex = 
(QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName.getString()) && (tableType 
== INDEX));
+        if (tableType == INDEX && ! isThisAViewIndex && !isMetaIndex) {
             byte[] tableKey = SchemaUtil.getTableKey(tenantId == null ? null : 
tenantId.getBytes(),
                     parentSchemaName == null ? null : 
parentSchemaName.getBytes(), parentTableName.getBytes());
             maxLookbackAge = scanMaxLookbackAgeFromParent(tableKey, 
clientTimeStamp);
@@ -1679,7 +1688,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setMaxLookbackAge(maxLookbackAge != null ? maxLookbackAge :
                 (oldTable != null ? oldTable.getMaxLookbackAge() : null));
 
-        if (tableType == INDEX && !isThisAViewIndex && 
ttl.equals(TTL_EXPRESSION_NOT_DEFINED)) {
+        if (tableType == INDEX && !isThisAViewIndex && 
ttl.equals(TTL_EXPRESSION_NOT_DEFINED) && !isMetaIndex) {
             //If this is an index on Table get TTL from Table
             byte[] tableKey = getTableKey(tenantId == null ? null : 
tenantId.getBytes(),
                     parentSchemaName == null ? null : 
parentSchemaName.getBytes(),
@@ -2798,14 +2807,23 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     }
                 }
 
+
+                // Not sure whether this TODO is relevant anymore. 
PHOENIX-7107 introduces indexes
+                // on system table.
                 // TODO: Switch this to HRegion#batchMutate when we want to 
support indexes on the
                 // system table. Basically, we get all the locks that we don't 
already hold for all the
                 // tableMetadata rows. This ensures we don't have deadlock 
situations (ensuring
                 // primary and then index table locks are held, in that 
order). For now, we just don't support
                 // indexing on the system table. This is an issue because of 
the way we manage batch mutation
                 // in the Indexer.
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
localMutations, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+
+                // Update SYSTEM.CATALOG indexes only for
+                // 1. ordinary table/index mutations (create table/index).
+                // 2. When creating system indexes itself, no further index 
processing is required.
+                boolean updateCatalogIndexes = 
!SchemaUtil.isSystemTable(Bytes.toBytes(fullTableName));
+                mutateRowsWithLocks(this.accessCheckEnabled, env, region, 
localMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE,
+                        updateCatalogIndexes);
 
                 // Invalidate the cache - the next getTable call will add it
                 // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache
@@ -3138,10 +3156,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     }
                     throw new IllegalStateException(msg);
                 }
+                // Update SYSTEM.CATALOG indexes only for
+                // 1. ordinary table/index mutations (drop table/index).
+                // 2. When dropping system indexes itself, no further index 
processing is required.
+                boolean
+                        updateCatalogIndexes =
+                        (pTableType != INDEX) || (!Bytes.toString(schemaName)
+                                
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA));
 
                 // drop rows from catalog on this region
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
localMutations,
-                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, env, region, 
localMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE,
+                        updateCatalogIndexes);
 
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
                 for (ImmutableBytesPtr ckey : invalidateList) {
@@ -3367,7 +3393,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 }
                 Delete delete = new Delete(kv.getRowArray(), 
kv.getRowOffset(), kv.getRowLength(),
                         clientTimeStamp);
-                catalogMutations.add(delete);
+                // Remove duplicate mutations if present
+                if (Bytes.compareTo(key, 0, key.length, kv.getRowArray(), 
kv.getRowOffset(), kv.getRowLength()) != 0) {
+                    catalogMutations.add(delete);
+                }
                 results.clear();
                 scanner.next(results);
             } while (!results.isEmpty());
@@ -3746,8 +3775,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         throw new IllegalStateException(msg);
                     }
                 }
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
localMutations,
-                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+                // Update SYSTEM.CATALOG indexes only for ordinary table 
column mutations.
+                // Column mutations of indexes are not allowed. See above
+                // Add column on SYSTEM.CATALOG should not be processed for 
index updates,
+                // since an index on a future column cannot exist.
+                boolean
+                        updateCatalogIndexes =
+                        !Bytes.toString(schemaName)
+                                
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+
+                mutateRowsWithLocks(this.accessCheckEnabled, env, region, 
localMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE,
+                        updateCatalogIndexes);
                 // Invalidate from cache
                 for (ImmutableBytesPtr invalidateKey : invalidateList) {
                     metaDataCache.invalidate(invalidateKey);
@@ -4583,8 +4622,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     long serverTimestamp = 
EnvironmentEdgeManager.currentTimeMillis();
                     tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(
                             key, clientTimeStamp, serverTimestamp));
-                    mutateRowsWithLocks(this.accessCheckEnabled, region, 
tableMetadata, Collections.<byte[]>emptySet(),
-                        HConstants.NO_NONCE, HConstants.NO_NONCE);
+                    boolean updateCatalogIndexes = !Bytes.toString(schemaName)
+                            
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+
+                    mutateRowsWithLocks(this.accessCheckEnabled, env, region, 
tableMetadata,
+                            Collections.<byte[]>emptySet(), 
HConstants.NO_NONCE,
+                            HConstants.NO_NONCE, updateCatalogIndexes);
                     // Invalidate from cache
                     Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                             
GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -4835,7 +4878,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 // Don't store function info for temporary functions.
                 if (!temporaryFunction) {
                     mutateRowsWithLocks(this.accessCheckEnabled, region, 
functionMetaData,
-                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+                            Collections.<byte[]>emptySet(), 
HConstants.NO_NONCE,
+                            HConstants.NO_NONCE);
                 }
 
                 // Invalidate the cache - the next getFunction call will add it
@@ -4891,8 +4935,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
functionMetaData, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, region, 
functionMetaData,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
 
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(functionMetaData);
@@ -5013,8 +5057,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         return;
                     }
                 }
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
schemaMutations, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, region, 
schemaMutations,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
 
                 // Invalidate the cache - the next getSchema call will add it
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
@@ -5065,8 +5109,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
                 }
-                mutateRowsWithLocks(this.accessCheckEnabled, region, 
schemaMetaData, Collections.<byte[]>emptySet(),
-                    HConstants.NO_NONCE, HConstants.NO_NONCE);
+                mutateRowsWithLocks(this.accessCheckEnabled, region, 
schemaMetaData,
+                        Collections.<byte[]>emptySet(), HConstants.NO_NONCE, 
HConstants.NO_NONCE);
                 Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(this.env)
                         .getMetaDataCache();
                 long currentTime = 
MetaDataUtil.getClientTimeStamp(schemaMetaData);
@@ -5135,18 +5179,56 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
     /**
      * Perform atomic mutations on rows within a region
-     *
+     * additionally set metadata on mutations, if catalog indexes exists
      * @param accessCheckEnabled Use the login user to mutate rows if enabled
-     * @param region Region containing rows to be mutated
-     * @param mutations List of mutations for rows that must be contained 
within the region
-     * @param rowsToLock Rows to lock
-     * @param nonceGroup Optional nonce group of the operation
-     * @param nonce Optional nonce of the operation
+     * @param env                The RegionCoprocessorEnvironment
+     * @param region             Region containing rows to be mutated
+     * @param mutations          List of mutations for rows that must be 
contained within the
+     *                           region
+     * @param rowsToLock         Rows to lock
+     * @param nonceGroup         Optional nonce group of the operation
+     * @param nonce              Optional nonce of the operation
+     * @param updateCatalogIndexes check if Catalog indexes exists
      * @throws IOException
      */
+
+    static void mutateRowsWithLocks(final boolean accessCheckEnabled,
+            final RegionCoprocessorEnvironment env, final Region region,
+            final List<Mutation> mutations, final Set<byte[]> rowsToLock, 
final long nonceGroup,
+            final long nonce, boolean updateCatalogIndexes) throws IOException 
{
+
+        try {
+            Configuration conf = env.getConfiguration();
+            boolean catalogIndexesEnabled = 
conf.getBoolean(SYSTEM_CATALOG_INDEXES_ENABLED, 
DEFAULT_SYSTEM_CATALOG_INDEXES_ENABLED);
+            if ((updateCatalogIndexes) && (catalogIndexesEnabled)) {
+                setMetaDataOnMutationsIfCatalogIndexExists(env, mutations );
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Setting metadata on mutations :" + 
mutations);
+                }
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+
+        mutateRowsWithLocks(accessCheckEnabled, region, mutations, rowsToLock, 
nonceGroup, nonce);
+    }
+
+
+        /**
+         * Perform atomic mutations on rows within a region
+         * @param accessCheckEnabled Use the login user to mutate rows if 
enabled
+         * @param region             Region containing rows to be mutated
+         * @param mutations          List of mutations for rows that must be 
contained within the
+         *                           region
+         * @param rowsToLock         Rows to lock
+         * @param nonceGroup         Optional nonce group of the operation
+         * @param nonce              Optional nonce of the operation
+         * @throws IOException
+         */
     static void mutateRowsWithLocks(final boolean accessCheckEnabled, final 
Region region,
             final List<Mutation> mutations, final Set<byte[]> rowsToLock, 
final long nonceGroup,
             final long nonce) throws IOException {
+
         // We need to mutate SYSTEM.CATALOG or SYSTEM.CHILD_LINK with 
HBase/login user
         // if access is enabled.
         if (accessCheckEnabled) {
@@ -5172,6 +5254,40 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
     }
 
+    private static void setMetaDataOnMutationsIfCatalogIndexExists(
+            final RegionCoprocessorEnvironment env, final List<Mutation> 
mutations)
+            throws SQLException, IOException {
+        final byte[] key = SchemaUtil.getTableKey(null, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+        Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(env).getMetaDataCache();
+        PTable systemCatalogPTable = (PTable) 
metaDataCache.getIfPresent(cacheKey);
+        if (systemCatalogPTable == null) {
+            try (PhoenixConnection connection = 
getServerConnectionForMetaData(new Properties(), env.getConfiguration())
+                    .unwrap(PhoenixConnection.class)) {
+                systemCatalogPTable = 
PhoenixRuntime.getTableNoCache(connection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            } catch (SQLException sqle) {
+                throw new IOException(String.format("Failed to get PTable for 
SYSTEM.CATALOG via getTableNoCache() : key = %s" , Bytes.toString(key)), sqle);
+            }
+        }
+
+        if (systemCatalogPTable == null) {
+            throw new IOException(String.format("Failed to get PTable for 
SYSTEM.CATALOG via getTableNoCache() and  GlobalCache: key = %s" , 
Bytes.toString(key)));
+        }
+
+        if ((systemCatalogPTable.getIndexes().isEmpty())) {
+            LOGGER.debug("No indexes found for SYSTEM.CATALOG: key = {}", 
Bytes.toString(key));
+            return;
+        }
+        Properties metaConnectionProps = new Properties();
+        
metaConnectionProps.setProperty(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, 
FORCE_INDEX_MUTATE_METADATA_AS_ATTRIB);
+        try (PhoenixConnection connection = 
getServerConnectionForMetaData(metaConnectionProps, env.getConfiguration())
+                .unwrap(PhoenixConnection.class)) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            IndexMaintainer.serialize(systemCatalogPTable, ptr, connection);
+            IndexMetaDataCacheClient.setMetaDataOnMutations(connection, 
systemCatalogPTable, mutations, ptr);
+        }
+    }
+
     private TableName getParentPhysicalTableName(PTable table) {
         return (table
                 .getType() == PTableType.VIEW || (table.getType() == INDEX && 
table.getViewIndexId() != null))
@@ -5197,7 +5313,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
      * @return Connection object.
      * @throws SQLException If the Connection could not be retrieved.
      */
-    private static Connection getServerConnectionForMetaData(final 
Configuration config)
+    private static Connection   getServerConnectionForMetaData(final 
Configuration config)
             throws SQLException {
         Preconditions.checkNotNull(config, "The configs must not be null");
         return getServerConnectionForMetaData(new Properties(), config);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 8a3a8f681b..845155f6cc 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -162,6 +162,8 @@ public class TaskRegionObserver implements RegionObserver, 
RegionCoprocessor {
         public void run() {
             PhoenixConnection connForTask = null;
             try {
+                // TODO: Not sufficient info available when namespaces are 
enabled and index rebuild task are run
+                //  getConnectionOnServer can fail when namespaces are enabled 
but SYSTEM namespace not available
                 connForTask = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
                 String[] excludeStates = new String[] { 
PTable.TaskStatus.FAILED.toString(),
                         PTable.TaskStatus.COMPLETED.toString() };
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java
index c4072127aa..b3e68b8212 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseRowKeyMatcherTestIT.java
@@ -90,6 +90,7 @@ import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
@@ -108,6 +109,8 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LINK_HBASE_
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -504,7 +507,42 @@ public abstract class BaseRowKeyMatcherTestIT extends 
ParallelStatsDisabledIT {
 
     }
 
-    // Helper to get rowKeyMatcher from Metadata.
+    byte[] getRowKeyMatcherFromSyscatIndex(String tenantId, String schemaName,
+            String tableName, boolean useIndexTable) throws SQLException {
+
+        final String
+                SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL =
+                "SELECT ROW_KEY_MATCHER FROM SYSTEM.CATALOG " + "WHERE %s AND 
TABLE_SCHEM <> 'SYSTEM' AND TABLE_NAME = '%s' AND  ROW_KEY_MATCHER IS NOT NULL";
+        final String  SYS_CATALOG_IDX_ROW_KEY_MATCHER_HEADER_SQL = "SELECT 
\"0:ROW_KEY_MATCHER\" FROM SYSTEM.SYS_ROW_KEY_MATCHER_IDX " + "WHERE %s AND 
\":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'";
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String
+                    tenantClause = useIndexTable ?
+                    (tenantId == null || tenantId.isEmpty() ?
+                            "\":TENANT_ID\" IS NULL" :
+                            String.format("\":TENANT_ID\" = '%s'", tenantId)) :
+                    (tenantId == null || tenantId.isEmpty() ?
+                            "TENANT_ID IS NULL" :
+                            String.format("TENANT_ID = '%s'", tenantId));
+            String
+                    sql = useIndexTable ?
+                    String.format(SYS_CATALOG_IDX_ROW_KEY_MATCHER_HEADER_SQL, 
tenantClause, schemaName,
+                            tableName) :
+                    String.format(SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL, 
tenantClause,
+                            tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            byte[] matcherBytes = rs.next() ? rs.getBytes(1) : 
EMPTY_BYTE_ARRAY;
+            LOGGER.info("Row key matcher SQL: {}", sql);
+            LOGGER.info("Row key matcher: {}, {}",
+                    Bytes.toStringBinary(matcherBytes),
+                    
Bytes.toStringBinary(PVarbinaryEncoded.INSTANCE.toBytes(matcherBytes)));
+            return PVarbinaryEncoded.INSTANCE.toBytes(matcherBytes);
+        }
+    }
+
+        // Helper to get rowKeyMatcher from Metadata.
     private Pair<String, byte[]> getRowKeyMatchersFromView(PhoenixConnection 
connection,
             PTable view) throws SQLException {
         return getRowKeyMatchersFromView(connection, 
view.getName().getString());
@@ -560,6 +598,12 @@ public abstract class BaseRowKeyMatcherTestIT extends 
ParallelStatsDisabledIT {
                 PVarbinaryEncoded.INSTANCE.toBytes(
                         WhereOptimizer.getRowKeyMatcher(connection, tableName, 
viewStatementTable,
                                 viewColumnConstantsToBe, 
isViewColumnReferencedToBe));
+        byte[]
+                rowKeyMatcher3 = 
getRowKeyMatcherFromSyscatIndex(view.getTenantId() != null ? 
view.getTenantId().getString() : null, view.getSchemaName().getString(), 
view.getTableName().getString(), false);
+
+        byte[]
+                rowKeyMatcher4 = 
getRowKeyMatcherFromSyscatIndex(view.getTenantId() != null ? 
view.getTenantId().getString() : null, view.getSchemaName().getString(), 
view.getTableName().getString(), true);
+
         LOGGER.debug(String.format(
                 "target-view-name = %s, physical = %s, stmt-table = %s\n, " +
                         "row-matcher-0 = %s (syscat)\n, row-matcher-1 = %s\n, 
row-matcher-2 = %s\n",
@@ -571,6 +615,10 @@ public abstract class BaseRowKeyMatcherTestIT extends 
ParallelStatsDisabledIT {
                 Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher1) == 0);
         assertTrue("RowKey matcher patterns do not match",
                 Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher2) == 0);
+        assertTrue("RowKey matcher patterns do not match",
+                Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher3) == 0);
+        assertTrue("RowKey matcher patterns do not match",
+                Bytes.compareTo(rowKeyInfo.getSecond(), rowKeyMatcher4) == 0);
         return rowKeyMatcher1;
     }
 
@@ -808,6 +856,15 @@ public abstract class BaseRowKeyMatcherTestIT extends 
ParallelStatsDisabledIT {
             List<PDataType[]> testCases = getTestCases();
             SortOrder[][] sortOrders = getSortOrders();
 
+            try (Connection conn = DriverManager.getConnection(getUrl());
+                    Statement stmt = conn.createStatement()) {
+                //TestUtil.dumpTable(conn, 
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
+                stmt.execute("CREATE INDEX IF NOT EXISTS SYS_VIEW_HDR_IDX ON 
SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY) 
INCLUDE (TABLE_TYPE, VIEW_STATEMENT, TTL, ROW_KEY_MATCHER) WHERE TABLE_TYPE = 
'v'");
+                stmt.execute("CREATE INDEX IF NOT EXISTS 
SYS_ROW_KEY_MATCHER_IDX ON SYSTEM.CATALOG(ROW_KEY_MATCHER, TTL, TABLE_TYPE, 
TENANT_ID, TABLE_SCHEM, TABLE_NAME) INCLUDE (VIEW_STATEMENT) WHERE TABLE_TYPE = 
'v' AND ROW_KEY_MATCHER IS NOT NULL");
+                stmt.execute("CREATE INDEX IF NOT EXISTS 
SYS_VIEW_INDEX_HDR_IDX ON SYSTEM.CATALOG(DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, 
VIEW_INDEX_ID_DATA_TYPE), TENANT_ID, TABLE_SCHEM, TABLE_NAME) 
INCLUDE(TABLE_TYPE, LINK_TYPE, VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE)  WHERE 
TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND VIEW_INDEX_ID IS NOT NULL");
+                conn.commit();
+            }
+
             String tableName = "";
             tableName = createViewHierarchy(
                     testCases, sortOrders, 500, 5000, 3,
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java
new file mode 100644
index 0000000000..781cc9263a
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialSystemCatalogIndexIT.java
@@ -0,0 +1,988 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.TaskRegionObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ViewTTLIT;
+import 
org.apache.phoenix.hbase.index.write.TestTrackingParallelWriterIndexCommitter;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TableViewFinderResult;
+import org.apache.phoenix.util.ViewUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_INDEX_SYSTEM_TABLE;
+import static org.apache.phoenix.exception.SQLExceptionCode.MISMATCHED_TOKEN;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriter.INDEX_COMMITTER_CONF_KEY;
+import static 
org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.COLUMN_TYPES;
+import static 
org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.TENANT_VIEW_COLUMNS;
+import static 
org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static 
org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
+import static 
org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
+import static 
org.apache.phoenix.query.QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialSystemCatalogIndexIT extends ParallelStatsDisabledIT {
+    static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class);
+    static final int VIEW_TTL_10_SECS = 10;
+    static final int VIEW_TTL_300_SECS = 300;
+    static final int VIEW_TTL_120_SECS = 120;
+
+    // Various Test System indexes
+    private final static String SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME = 
"INDEX_TABLE_LINK_TEST_INDEX";
+    private final static String FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME = 
QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME;
+    private final static String SYS_INDEX_HDR_TEST_INDEX_NAME = 
"INDEX_HDR_TEST_INDEX";
+    private final static String FULL_SYS_INDEX_HDR_TEST_INDEX_NAME = 
QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_INDEX_HDR_TEST_INDEX_NAME;
+    private final static String SYS_VIEW_HDR_TEST_INDEX_NAME = 
"VIEW_HDR_TEST_INDEX";
+    private final static String FULL_SYS_VIEW_HDR_TEST_INDEX_NAME = 
QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_VIEW_HDR_TEST_INDEX_NAME;
+    private final static String SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME = 
"VIEW_INDEX_HDR_TEST_INDEX";
+    private final static String FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME = 
QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME;
+    private final static String SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME = 
"ROW_KEY_MATCHER_TEST_INDEX";
+    private final static String FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME = 
QueryConstants.SYSTEM_SCHEMA_NAME + "." + SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME;
+
+    // System Index creation statements
+    private final static String SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL =  
String.format(
+            "CREATE INDEX IF NOT EXISTS %s " +
+                    "ON SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, 
COLUMN_NAME, COLUMN_FAMILY) INCLUDE(TABLE_TYPE, LINK_TYPE) " +
+                    "WHERE TABLE_TYPE = 'i' AND LINK_TYPE = 1", 
SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME);
+    private final static String SYS_INDEX_HDR_TEST_INDEX_SQL =  String.format(
+            "CREATE INDEX IF NOT EXISTS %s " +
+                    "ON SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, TABLE_NAME, 
COLUMN_NAME, COLUMN_FAMILY, INDEX_STATE, DATA_TABLE_NAME) INCLUDE(TABLE_TYPE) " 
+
+                    "WHERE TABLE_TYPE = 'i' AND INDEX_STATE IS NOT NULL", 
SYS_INDEX_HDR_TEST_INDEX_NAME);
+    private final static String SYS_VIEW_HDR_TEST_INDEX_SQL =  String.format(
+            "CREATE INDEX IF NOT EXISTS %s " +
+                    "ON SYSTEM.CATALOG(TENANT_ID, TABLE_SCHEM, TABLE_NAME, 
COLUMN_NAME, COLUMN_FAMILY) " +
+                    "INCLUDE (TABLE_TYPE, VIEW_STATEMENT, TTL, 
ROW_KEY_MATCHER) " +
+                    "WHERE TABLE_TYPE = 'v'", SYS_VIEW_HDR_TEST_INDEX_NAME);
+    private final static String SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL =  
String.format(
+            "CREATE INDEX IF NOT EXISTS %s " +
+                    "ON SYSTEM.CATALOG(ROW_KEY_MATCHER, TTL, TABLE_TYPE, 
TENANT_ID, TABLE_SCHEM, TABLE_NAME) " +
+                    "INCLUDE (VIEW_STATEMENT) " +
+                    "WHERE TABLE_TYPE = 'v' AND ROW_KEY_MATCHER IS NOT NULL", 
SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME);
+    private final static String SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL =  
String.format(
+            "CREATE INDEX IF NOT EXISTS %s " +
+                    "ON SYSTEM.CATALOG(DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, 
VIEW_INDEX_ID_DATA_TYPE), TENANT_ID, TABLE_SCHEM, TABLE_NAME) " +
+                    "INCLUDE(TABLE_TYPE, LINK_TYPE, VIEW_INDEX_ID, 
VIEW_INDEX_ID_DATA_TYPE)  " +
+                    "WHERE TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND 
VIEW_INDEX_ID IS NOT NULL", SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME);
+
+    // SQLs on the data table - SYSTEM.CATALOG
+    static final String SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL = "SELECT 
ROW_KEY_MATCHER FROM SYSTEM.CATALOG "
+            + "WHERE %s AND TABLE_SCHEM <> 'SYSTEM' AND TABLE_NAME = '%s' AND 
" + "ROW_KEY_MATCHER IS NOT NULL";
+
+    static final String SYS_CATALOG_VIEW_TTL_HEADER_SQL = "SELECT TTL FROM 
SYSTEM.CATALOG "
+            + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND 
TABLE_TYPE = 'v'";
+
+    static final String SYS_CATALOG_VIEW_INDEX_HEADER_SQL = "SELECT 
VIEW_INDEX_ID FROM SYSTEM.CATALOG "
+            + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND 
TABLE_TYPE = 'i' AND LINK_TYPE IS NULL";
+
+    static final String SYS_CATALOG_SYS_INDEX_TABLE_SQL = "SELECT count(*) 
FROM SYSTEM.CATALOG " +
+            "WHERE TABLE_SCHEM = 'SYSTEM' AND TABLE_NAME = '%s'";
+
+    static final String SYS_CATALOG_INDEX_TABLE_LINK_SQL = "SELECT count(*) 
FROM SYSTEM.CATALOG " +
+            "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND 
TABLE_TYPE = 'i'" +
+            " AND LINK_TYPE = 1";
+
+    static final String SYS_CATALOG_INDEX_HDR_SQL = "SELECT count(*) FROM 
SYSTEM.CATALOG " +
+            "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND 
TABLE_TYPE = 'i'" +
+            " AND INDEX_STATE IS NOT NULL";
+
+    static final String SYS_CATALOG_COLUMN_EXISTS_SQL = "SELECT count(*) FROM 
SYSTEM.CATALOG " +
+            "WHERE TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = 
'%s'" ;
+
+    // SQL on the index table - SYSTEM.SYS_INDEX_TABLE_LINK_TEST_INDEX,
+    static final String SYS_CATALOG_IDX_INDEX_TABLE_LINK_SQL = "SELECT 
\":COLUMN_FAMILY\" FROM %s " +
+            "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ;
+
+    // SQL on the index table - SYSTEM.SYS_INDEX_HDR_TEST_INDEX_NAME,
+    static final String SYS_CATALOG_IDX_INDEX_HDR_SQL = "SELECT 
\"0:DATA_TABLE_NAME\", \"0:INDEX_STATE\" FROM %s " +
+            "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ;
+
+    // SQL on the index table - SYSTEM.SYS_VIEW_HDR_TEST_INDEX,
+    static final String SYS_CATALOG_IDX_VIEW_HEADER_SQL = "SELECT 
\"0:VIEW_STATEMENT\" FROM %s " +
+            "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ;
+
+    // SQL on the index table - SYSTEM.SYS_VIEW_INDEX_HDR_TEST_INDEX,
+    static final String SYS_CATALOG_IDX_VIEW_INDEX_HEADER_SQL = "SELECT \": 
DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID,VIEW_INDEX_ID_DATA_TYPE)\" FROM %s " +
+            "WHERE %s AND \":TABLE_SCHEM\" = '%s' AND \":TABLE_NAME\" = '%s'" ;
+
+    private static RegionCoprocessorEnvironment taskRegionEnvironment;
+    private static HBaseTestingUtility hbaseTestUtil;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        InstanceResolver.clearSingletons();
+        // Override to get required config for static fields loaded that 
require HBase config
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+
+            @Override public Configuration getConfiguration() {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(SYSTEM_CATALOG_INDEXES_ENABLED, String.valueOf(true));
+                return conf;
+            }
+
+            @Override public Configuration getConfiguration(Configuration 
confToClone) {
+                Configuration conf = HBaseConfiguration.create();
+                conf.set(SYSTEM_CATALOG_INDEXES_ENABLED, String.valueOf(true));
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+        Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        conf.set(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        conf.set(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        hbaseTestUtil = new HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        hbaseTestUtil.startMiniCluster();
+
+
+        // Turn on the View TTL feature
+        Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>() 
{{
+            put(QueryServices.SYSTEM_CATALOG_INDEXES_ENABLED, 
String.valueOf(true));
+            put(QueryServices.PHOENIX_TABLE_TTL_ENABLED, String.valueOf(true));
+            // no max lookback
+            
put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(0));
+            put(QueryServices.PHOENIX_VIEW_TTL_ENABLED, 
Boolean.toString(true));
+            put(QueryServices.PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT, 
String.valueOf(1));
+            put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+                    Long.toString(Long.MAX_VALUE));
+            put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                    Long.toString(Long.MAX_VALUE));
+        }};
+
+        setUpTestDriver(new ReadOnlyProps(ReadOnlyProps.EMPTY_PROPS,
+                DEFAULT_PROPERTIES.entrySet().iterator()));
+
+        taskRegionEnvironment =
+                getUtility()
+                        .getRSForFirstRegionInTable(
+                                
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        
.getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
+
+    }
+
+
+    void assertSystemCatalogHasIndexHdr(String tenantId, String schemaName,
+            String tableName) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "TENANT_ID IS NULL" :
+                    String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String
+                    .format(SYS_CATALOG_INDEX_HDR_SQL, tenantClause, 
schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            int numRows = rs.next() ? rs.getInt(1) : 0;
+
+            assertEquals(String.format("Expected rows do not match for schema 
= %s, table = %s",
+                    schemaName, tableName), 1, numRows);
+        }
+    }
+
+    void assertAdditionalColumnInMetaIndexTable(String schemaName, String 
indexName, String newColumnName,
+            boolean exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String sql = String
+                    .format(SYS_CATALOG_COLUMN_EXISTS_SQL, schemaName, 
indexName, newColumnName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            int numRows = rs.next() ? rs.getInt(1) : 0;
+
+            assertEquals(String.format("Expected rows do not match for schema 
= %s, table = %s",
+                    schemaName, indexName), exists ? 1 : 0, numRows);
+        }
+    }
+
+    void assertSystemCatalogHasIndexTableLinks(String tenantId, String 
schemaName,
+            String tableName) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "TENANT_ID IS NULL" :
+                    String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String
+                    .format(SYS_CATALOG_INDEX_TABLE_LINK_SQL, tenantClause, 
schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            int numRows = rs.next() ? rs.getInt(1) : 0;
+
+            assertEquals(String.format("Expected rows do not match for schema 
= %s, table = %s",
+                    schemaName, tableName), 1, numRows);
+        }
+    }
+
+    void assertSystemCatalogHasViewIndexHeaderRelatedColumns(String tenantId, 
String schemaName,
+            String tableName, boolean exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "TENANT_ID IS NULL" :
+                    String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String
+                    .format(SYS_CATALOG_VIEW_INDEX_HEADER_SQL, tenantClause, 
schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            String viewIndexId = rs.next() ? rs.getString(1) : null;
+            if (exists) {
+                assertNotNull(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), viewIndexId);
+            } else {
+                assertNull(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), viewIndexId);
+            }
+
+        }
+    }
+
+    void assertSystemCatalogHasViewHeaderRelatedColumns(String tenantId, 
String schemaName,
+            String tableName, boolean exists, long ttlValueExpected) throws 
SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "TENANT_ID IS NULL" :
+                    String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String
+                    .format(SYS_CATALOG_VIEW_TTL_HEADER_SQL, tenantClause, 
schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            if (exists) {
+                String ttlStr = rs.next() ? rs.getString(1) : null;
+                long actualTTLValueReturned = ttlStr != null ? 
Integer.valueOf(ttlStr): 0;
+                assertEquals(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), ttlValueExpected, 
actualTTLValueReturned);
+            } else {
+                assertFalse(String.format("Rows do exists for schema = %s, 
table = %s",
+                        schemaName, tableName), rs.next());
+
+            }
+        }
+    }
+
+    void assertSystemCatalogHasRowKeyMatcherRelatedColumns(String tenantId, 
String schemaName,
+            String tableName, boolean exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "TENANT_ID IS NULL" :
+                    String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String
+                    .format(SYS_CATALOG_ROW_KEY_MATCHER_HEADER_SQL, 
tenantClause, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            if (exists) {
+                byte[] matcherBytes = rs.next() ? rs.getBytes(1) : null;
+                assertNotNull(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), matcherBytes);
+            } else {
+                assertFalse(String.format("Rows do exists for schema = %s, 
table = %s",
+                        schemaName, tableName), rs.next());
+
+            }
+        }
+    }
+
+    String stripQuotes(String name) {
+        return name.replace("\"", "");
+    }
+
+    void assertSystemCatalogIndexTable(String systemCatalogIndexName, boolean 
exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String sql = String.format(SYS_CATALOG_SYS_INDEX_TABLE_SQL, 
systemCatalogIndexName,
+                    systemCatalogIndexName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            rs.next();
+            assertTrue(String.format("Expected rows do not match for 
index-table = SYSTEM.%s",
+                    systemCatalogIndexName), exists ? rs.getInt(1) > 0 : 
rs.getInt(1) == 0 );
+        }
+    }
+
+    void assertSystemCatalogIndexHaveIndexHdr(String systemCatalogIndexName,
+            String tenantId, String schemaName,
+            String tableName, boolean exists, String indexName, String 
expectedIndexState) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "\":TENANT_ID\" IS NULL" :
+                    String.format("\":TENANT_ID\" = '%s'", tenantId);
+            String sql = String.format(SYS_CATALOG_IDX_INDEX_HDR_SQL, 
systemCatalogIndexName,
+                    tenantClause, schemaName, indexName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            boolean hasRows = rs.next();
+            String actualDataTableName = hasRows ? rs.getString(1) : null;
+            String actualIndexState =  hasRows ? rs.getString(2) : null;
+            if (exists) {
+                assertEquals(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, indexName), tableName, 
actualDataTableName);
+                assertEquals(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, indexName), expectedIndexState, 
actualIndexState);
+            } else {
+                assertNull(String.format("Zero rows expected for schema = %s, 
table = %s",
+                        schemaName, indexName), actualDataTableName);
+                assertNull(String.format("Zero rows expected for schema = %s, 
table = %s",
+                        schemaName, indexName), actualIndexState);
+            }
+        }
+    }
+
+
+    void assertSystemCatalogIndexHaveIndexTableLinks(String 
systemCatalogIndexName,
+            String tenantId, String schemaName,
+            String tableName, boolean exists, String indexName) throws 
SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "\":TENANT_ID\" IS NULL" :
+                    String.format("\":TENANT_ID\" = '%s'", tenantId);
+            String sql = String.format(SYS_CATALOG_IDX_INDEX_TABLE_LINK_SQL, 
systemCatalogIndexName,
+                            tenantClause, schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            String colFamilyStr = rs.next() ? rs.getString(1) : null;
+            if (exists) {
+                assertEquals(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), indexName, colFamilyStr);
+            } else {
+                assertNull(String.format("Zero rows expected for schema = %s, 
table = %s",
+                        schemaName, tableName), colFamilyStr);
+            }
+        }
+    }
+
+    void assertSystemCatalogIndexHaveViewHeaders(String systemCatalogIndexName,
+            String tenantId, String schemaName,
+            String tableName, boolean exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "\":TENANT_ID\" IS NULL" :
+                    String.format("\":TENANT_ID\" = '%s'", tenantId);
+            String sql = String.format(SYS_CATALOG_IDX_VIEW_HEADER_SQL, 
systemCatalogIndexName,
+                    tenantClause, schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            String viewStmt = rs.next() ? rs.getString(1) : null;
+            if (exists) {
+                assertNotNull(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), viewStmt);
+            } else {
+                assertNull(String.format("Zero rows expected for schema = %s, 
table = %s",
+                        schemaName, tableName), viewStmt);
+            }
+        }
+    }
+
+
+    void assertSystemCatalogIndexHaveViewIndexHeaders(String 
systemCatalogIndexName,
+            String tenantId, String schemaName,
+            String tableName, boolean exists) throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ?
+                    "\":TENANT_ID\" IS NULL" :
+                    String.format("\":TENANT_ID\" = '%s'", tenantId);
+            String sql = String.format(SYS_CATALOG_IDX_VIEW_INDEX_HEADER_SQL, 
systemCatalogIndexName,
+                    tenantClause, schemaName, tableName);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet();
+            Integer viewIndexId = rs.next() ? rs.getInt(1) : null;
+            if (exists) {
+                assertNotNull(String.format("Expected rows do not match for 
schema = %s, table = %s",
+                        schemaName, tableName), viewIndexId);
+            } else {
+                assertNull(String.format("Zero rows expected for schema = %s, 
table = %s",
+                        schemaName, tableName), viewIndexId);
+            }
+        }
+    }
+
+    void dropSystemCatalogIndex(String sysIndexName) throws SQLException {
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format("drop index %s ON SYSTEM.CATALOG", 
sysIndexName));
+            conn.commit();
+        }
+    }
+
+    void alterIndexState(String indexName, String tableName, PIndexState 
newState) throws SQLException {
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format("alter index %s ON %s %s", indexName, 
tableName, newState.name()));
+            conn.commit();
+        }
+    }
+
+    void dropTableWithChildViews(String baseTable, int numTaskRuns) throws 
Exception {
+        // Drop the base table
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            // Empty the task table first.
+            conn.createStatement()
+                    .execute("DELETE " + " FROM " + 
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+
+            String dropTableSQL = String.format("DROP TABLE IF EXISTS %s 
CASCADE", baseTable);
+            conn.createStatement().execute(dropTableSQL);
+            // Run DropChildViewsTask to complete the tasks for dropping child 
views. The depth of the view tree is 2,
+            // so we expect that this will be done in two task handling runs 
as each non-root level will be processed
+            // in one run
+
+            TaskRegionObserver.SelfHealingTask task =
+                    new TaskRegionObserver.SelfHealingTask(
+                            taskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+            for (int i = 0; i < numTaskRuns; i++) {
+                task.run();
+            }
+
+            assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), 
PTable.TaskType.DROP_CHILD_VIEWS,
+                    null, null, null, null, null);
+
+            // Views should be dropped by now
+            TableName linkTable = 
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
+            TableViewFinderResult childViewsResult = new 
TableViewFinderResult();
+            
ViewUtil.findAllRelatives(getUtility().getConnection().getTable(linkTable),
+                    HConstants.EMPTY_BYTE_ARRAY,
+                    SchemaUtil.getSchemaNameFromFullName(baseTable).getBytes(),
+                    SchemaUtil.getTableNameFromFullName(baseTable).getBytes(),
+                    PTable.LinkType.CHILD_TABLE,
+                    childViewsResult);
+            assertEquals(0, childViewsResult.getLinks().size());
+        }
+
+
+    }
+
+    static void assertTaskColumns(Connection conn, String expectedStatus, 
PTable.TaskType taskType,
+            String expectedTableName, String expectedTenantId, String 
expectedSchema, Timestamp expectedTs,
+            String expectedIndexName)
+            throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
+                " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+                " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+                taskType.getSerializedValue());
+        assertTrue(rs.next());
+        String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
+        assertEquals(expectedStatus, taskStatus);
+
+        if (expectedTableName != null) {
+            String tableName = 
rs.getString(PhoenixDatabaseMetaData.TABLE_NAME);
+            assertEquals(expectedTableName, tableName);
+        }
+
+        if (expectedTenantId != null) {
+            String tenantId = rs.getString(PhoenixDatabaseMetaData.TENANT_ID);
+            assertEquals(expectedTenantId, tenantId);
+        }
+
+        if (expectedSchema != null) {
+            String schema = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM);
+            assertEquals(expectedSchema, schema);
+        }
+
+        if (expectedTs != null) {
+            Timestamp ts = rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS);
+            assertEquals(expectedTs, ts);
+        }
+
+        if (expectedIndexName != null) {
+            String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
+            assertEquals(true, data.contains("\"IndexName\":\"" + 
expectedIndexName));
+        }
+    }
+
+    private List<String> getExplain(String query, Properties props) throws 
SQLException {
+        List<String> explainPlan = new ArrayList<>();
+        try(Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement("EXPLAIN " 
+ query);
+                ResultSet rs = statement.executeQuery()) {
+            while(rs.next()) {
+                String plan = rs.getString(1);
+                explainPlan.add(plan);
+            }
+        }
+        return explainPlan;
+    }
+
+
+    protected PhoenixTestBuilder.SchemaBuilder 
createLevel2TenantViewWithGlobalLevelTTL(
+            int globalTTL,
+            PhoenixTestBuilder.SchemaBuilder.TenantViewOptions 
tenantViewOptions,
+            PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions 
tenantViewIndexOptions,
+            boolean allowIndex) throws Exception {
+        // Define the test schema.
+        // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => 
(ORG_ID, KP)
+        // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new 
PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        PhoenixTestBuilder.SchemaBuilder.TableOptions
+                tableOptions = 
PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
+                globalViewOptions = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+        // View TTL is set to 300s => 300000 ms
+        globalViewOptions.setTableProps(String.format("TTL=%d", globalTTL));
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions 
globalViewIndexOptions
+                = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewWithOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+        if (tenantViewOptions != null) {
+            tenantViewWithOverrideOptions = tenantViewOptions;
+        }
+        PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions
+                tenantViewIndexOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+        if (tenantViewIndexOptions != null) {
+            tenantViewIndexOverrideOptions = tenantViewIndexOptions;
+        }
+        if (allowIndex) {
+            schemaBuilder.withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withGlobalViewIndexOptions(globalViewIndexOptions)
+                    .withTenantViewOptions(tenantViewWithOverrideOptions)
+                    .withTenantViewIndexOptions(tenantViewIndexOverrideOptions)
+                    .buildWithNewTenant();
+        } else {
+            schemaBuilder.withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withTenantViewOptions(tenantViewWithOverrideOptions)
+                    .buildWithNewTenant();
+        }
+        return schemaBuilder;
+    }
+
+    protected PhoenixTestBuilder.SchemaBuilder createLevel1TenantView(
+            PhoenixTestBuilder.SchemaBuilder.TenantViewOptions 
tenantViewOptions,
+            PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions 
tenantViewIndexOptions) throws Exception {
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK 
=> (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => 
(ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new 
PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        PhoenixTestBuilder.SchemaBuilder.TableOptions
+                tableOptions = 
PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+        if (tenantViewOptions != null) {
+            tenantViewOverrideOptions = tenantViewOptions;
+        }
+        PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions
+                tenantViewIndexOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+        if (tenantViewIndexOptions != null) {
+            tenantViewIndexOverrideOptions = tenantViewIndexOptions;
+        }
+
+        schemaBuilder.withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOverrideOptions)
+                
.withTenantViewIndexOptions(tenantViewIndexOverrideOptions).buildNewView();
+        return schemaBuilder;
+    }
+
+    @Test
+    public void testIndexesOfIndexTableLinkTypeAndIndexHdrCondition() throws 
Exception {
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewOptions = new 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions();
+        
tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS));
+        
tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES));
+
+        // Create 2 level view
+        final PhoenixTestBuilder.SchemaBuilder
+                schemaBuilder = 
createLevel2TenantViewWithGlobalLevelTTL(VIEW_TTL_300_SECS, tenantViewOptions, 
null,
+                true);
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String fullBaseTableName = schemaBuilder.getEntityTableName();
+        String schemaName = stripQuotes(
+                
SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String fullGlobalViewName = schemaBuilder.getEntityGlobalViewName();
+        String tenantViewName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalIndexName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewIndexName()));
+        String tenantIndexName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewIndexName()));
+
+        // Assert View Header rows exists for global view
+        assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, 
globalViewName, true, VIEW_TTL_300_SECS);
+        // Assert View Header rows exists for tenant view
+        assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, 
tenantViewName, true, 0);
+
+        // Assert index table link rows (link_type = 1) exists in SYSTEM. 
CATALOG
+        assertSystemCatalogHasIndexTableLinks(null, schemaName, 
globalViewName);
+        assertSystemCatalogHasIndexTableLinks(tenantId, schemaName, 
tenantViewName);
+
+        // Assert index table header rows (table_type = 'i' AND INDEX_STATE IS 
NOT NULL) exists in SYSTEM. CATALOG
+        assertSystemCatalogHasIndexHdr(null, schemaName, globalIndexName);
+        assertSystemCatalogHasIndexHdr(tenantId, schemaName, tenantIndexName);
+
+        //Create the SYSTEM.CATALOG index for Index Table links
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL);
+            stmt.execute(SYS_INDEX_HDR_TEST_INDEX_SQL);
+            conn.commit();
+        }
+        LOGGER.info("Finished creating index: " + 
SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL);
+        LOGGER.info("Finished creating index: " + 
SYS_INDEX_HDR_TEST_INDEX_SQL);
+
+        // Assert System Catalog index table has been created
+        assertSystemCatalogIndexTable(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, 
true);
+        assertSystemCatalogIndexTable(SYS_INDEX_HDR_TEST_INDEX_NAME, true);
+        // Assert appropriate rows are inserted in the SYSTEM.CATALOG index 
tables
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 null, schemaName, globalViewName,
+                true, globalIndexName);
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName,
+                true, tenantIndexName);
+
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, 
schemaName, globalViewName,
+                true, globalIndexName, 
PIndexState.ACTIVE.getSerializedValue());
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, 
tenantId, schemaName, tenantViewName,
+                true, tenantIndexName, 
PIndexState.ACTIVE.getSerializedValue());
+
+        // Alter the index state and verify meta index
+        alterIndexState(globalIndexName, fullGlobalViewName, 
PIndexState.UNUSABLE);
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, 
schemaName, globalViewName,
+                true, globalIndexName, 
PIndexState.INACTIVE.getSerializedValue());
+
+        alterIndexState(globalIndexName, fullGlobalViewName, 
PIndexState.REBUILD);
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, 
schemaName, globalViewName,
+                true, globalIndexName, 
PIndexState.ACTIVE.getSerializedValue());
+
+        LOGGER.info("Dropping base table " + fullBaseTableName);
+        dropTableWithChildViews(fullBaseTableName, 2);
+        assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, 
globalViewName,
+                false, VIEW_TTL_300_SECS);
+        assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, 
tenantViewName,
+                false, 0);
+
+        // Assert appropriate rows are dropped/deleted in the SYSTEM.CATALOG 
index tables
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 null, schemaName, globalViewName, false, null);
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName, false, null);
+
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, null, 
schemaName, null,
+                false, globalIndexName, null);
+        
assertSystemCatalogIndexHaveIndexHdr(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, 
tenantId, schemaName, null,
+                false, tenantIndexName, null);
+
+        dropSystemCatalogIndex(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME);
+        dropSystemCatalogIndex(SYS_INDEX_HDR_TEST_INDEX_NAME);
+
+        // Assert System Catalog index table dropped
+        
assertSystemCatalogIndexTable(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, false);
+        assertSystemCatalogIndexTable(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, 
false);
+    }
+
+    @Test
+    public void testIndexesOfViewAndIndexHeadersCondition() throws Exception {
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+        // View TTL is set to 120s => 120000 ms
+        tenantViewOptions.setTableProps(String.format("TTL=%d", 
VIEW_TTL_120_SECS));
+
+        final PhoenixTestBuilder.SchemaBuilder
+                schemaBuilder = createLevel1TenantView(tenantViewOptions, 
null);
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String fullBaseTableName = schemaBuilder.getEntityTableName();
+        String schemaName = stripQuotes(
+                
SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String tenantViewName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String indexOnTenantViewName = String
+                .format("IDX_%s", 
stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+        // TABLE_TYPE = 'v'
+        // Expected 1 rows - one for TenantView.
+        // Since the TTL property values are being set,
+        // we expect the view header columns to show up in regular queries
+        assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, 
tenantViewName,
+                true, VIEW_TTL_120_SECS);
+        // Assert index header rows (link_type IS NULL AND TABLE_TYPE = 'i') 
exists in SYSTEM. CATALOG
+        assertSystemCatalogHasViewIndexHeaderRelatedColumns(tenantId, 
schemaName, indexOnTenantViewName,true);
+
+        assertSystemCatalogHasRowKeyMatcherRelatedColumns(tenantId, 
schemaName, tenantViewName,true);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            //TestUtil.dumpTable(conn, 
TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
+            stmt.execute(SYS_VIEW_HDR_TEST_INDEX_SQL);
+            stmt.execute(SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL);
+            stmt.execute(SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL);
+
+            conn.commit();
+        }
+        LOGGER.info("Finished creating index: " + SYS_VIEW_HDR_TEST_INDEX_SQL);
+        LOGGER.info("Finished creating index: " + 
SYS_ROW_KEY_MATCHER_TEST_INDEX_SQL);
+        LOGGER.info("Finished creating index: " + 
SYS_VIEW_INDEX_HDR_TEST_INDEX_SQL);
+
+        /**
+         * Testing creation of SYS_INDEX rows
+         */
+
+        // Assert System Catalog index table has been created
+        assertSystemCatalogIndexTable(SYS_VIEW_HDR_TEST_INDEX_NAME, true);
+        assertSystemCatalogIndexTable(SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, 
true);
+        assertSystemCatalogIndexTable(SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, 
true);
+        // Assert appropriate rows are inserted in the SYSTEM.CATALOG index 
tables
+        
assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, 
tenantId, schemaName, tenantViewName, true);
+        
assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName, true);
+        
assertSystemCatalogIndexHaveViewIndexHeaders(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME,
 tenantId, schemaName, indexOnTenantViewName, true);
+
+        /**
+         * Testing explain plans
+         */
+
+        List<String> plans = getExplain("select TENANT_ID, TABLE_SCHEM, 
TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, TABLE_TYPE FROM SYSTEM.CATALOG WHERE 
TABLE_TYPE = 'v' ", new Properties());
+        assertEquals(String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", 
FULL_SYS_VIEW_HDR_TEST_INDEX_NAME), plans.get(0));
+
+        plans = getExplain("select VIEW_INDEX_ID, VIEW_INDEX_ID_DATA_TYPE FROM 
SYSTEM.CATALOG WHERE TABLE_TYPE = 'i' AND LINK_TYPE IS NULL AND VIEW_INDEX_ID 
IS NOT NULL", new Properties());
+        assertEquals(String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s", 
FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME), plans.get(0));
+
+        plans = getExplain("select ROW_KEY_MATCHER, TTL, TABLE_NAME FROM 
SYSTEM.CATALOG WHERE TABLE_TYPE = 'v' AND ROW_KEY_MATCHER IS NOT NULL", new 
Properties());
+        assertEquals(String.format("CLIENT PARALLEL 1-WAY RANGE SCAN OVER %s 
[not null]", FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME), plans.get(0));
+
+        /**
+         * Testing cleanup of SYS_INDEX rows after dropping tables and views
+         */
+        LOGGER.info("Dropping base table " + fullBaseTableName);
+        dropTableWithChildViews(fullBaseTableName, 2);
+        // Assert view header rows (link_type IS NULL AND TABLE_TYPE = 'v') 
does not exist in SYSTEM.CATALOG
+        assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, 
tenantViewName,
+                false, VIEW_TTL_120_SECS);
+        // Assert view header rows (ROW_KEY_MATCHER IS NOT NULL does not exist 
in SYSTEM.CATALOG
+        assertSystemCatalogHasRowKeyMatcherRelatedColumns(tenantId, 
schemaName, tenantViewName,false);
+        // Assert index header rows (link_type IS NULL AND TABLE_TYPE = 'i') 
does not exists in SYSTEM.CATALOG
+        assertSystemCatalogHasViewIndexHeaderRelatedColumns(tenantId, 
schemaName, tenantViewName,false);
+
+        // Assert appropriate rows are dropped/deleted in the SYSTEM.CATALOG 
index tables
+        
assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, 
tenantId, schemaName, tenantViewName, false);
+        
assertSystemCatalogIndexHaveViewHeaders(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName, false);
+        
assertSystemCatalogIndexHaveViewIndexHeaders(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName, false);
+
+        dropSystemCatalogIndex(SYS_VIEW_HDR_TEST_INDEX_NAME);
+        dropSystemCatalogIndex(SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME);
+        dropSystemCatalogIndex(SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME);
+
+        // Assert System Catalog index table dropped
+        assertSystemCatalogIndexTable(FULL_SYS_VIEW_HDR_TEST_INDEX_NAME, 
false);
+        
assertSystemCatalogIndexTable(FULL_SYS_ROW_KEY_MATCHER_TEST_INDEX_NAME, false);
+        assertSystemCatalogIndexTable(FULL_SYS_VIEW_INDEX_HDR_TEST_INDEX_NAME, 
false);
+    }
+
+    @Test
+    public void testIndexesOnOtherSystemTables() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            try {
+                stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_LINK_4_IDX 
ON SYSTEM.CHILD_LINK(TENANT_ID, TABLE_SCHEM, TABLE_NAME, LINK_TYPE) WHERE 
LINK_TYPE = 4");
+                fail();
+            } catch (SQLException sqle) {
+                Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), 
sqle.getErrorCode());
+            }
+            try {
+                stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_STATS_IDX 
ON SYSTEM.STATS(PHYSICAL_NAME, COLUMN_FAMILY, GUIDE_POST_WIDTH, 
GUIDE_POSTS_ROW_COUNT) WHERE COLUMN_FAMILY = '4'");
+                fail();
+            } catch (SQLException sqle) {
+                Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), 
sqle.getErrorCode());
+            }
+            try {
+                stmt.execute("CREATE INDEX IF NOT EXISTS SYS_INDEX_LOG_IDX ON 
SYSTEM.LOG(USER, CLIENT_IP, QUERY) WHERE QUERY_ID = '4'");
+                fail();
+            } catch (SQLException sqle) {
+                Assert.assertEquals(CANNOT_INDEX_SYSTEM_TABLE.getErrorCode(), 
sqle.getErrorCode());
+            }
+
+            try {
+                stmt.execute("CREATE INDEX IF NOT EXISTS 
SYS_INDEX_FUNCTION_IDX ON SYSTEM.FUNCTION(CLASS_NAME,JAR_PATH) WHERE 
FUNCTION_NAME = '4'");
+                fail();
+            } catch (SQLException sqle) {
+                Assert.assertEquals(MISMATCHED_TOKEN.getErrorCode(), 
sqle.getErrorCode());
+            }
+
+        }
+    }
+
+    @Test
+    @Ignore
+    // TODO: needs to make this test more robust after fixing the deadlock
+    public void testAddColumnWithCascadeOnMetaIndexes() throws Exception {
+
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewOptions = new 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions();
+        
tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS));
+        
tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES));
+
+        // Create 2 level view
+        final PhoenixTestBuilder.SchemaBuilder
+                schemaBuilder = 
createLevel2TenantViewWithGlobalLevelTTL(VIEW_TTL_300_SECS, tenantViewOptions, 
null,
+                true);
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String fullBaseTableName = schemaBuilder.getEntityTableName();
+        String schemaName = stripQuotes(
+                
SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String fullGlobalViewName = schemaBuilder.getEntityGlobalViewName();
+        String tenantViewName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalIndexName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewIndexName()));
+        String tenantIndexName = stripQuotes(
+                
SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewIndexName()));
+
+        // Assert View Header rows exists for global view
+        assertSystemCatalogHasViewHeaderRelatedColumns("", schemaName, 
globalViewName, true, VIEW_TTL_300_SECS);
+        // Assert View Header rows exists for tenant view
+        assertSystemCatalogHasViewHeaderRelatedColumns(tenantId, schemaName, 
tenantViewName, true, 0);
+
+        // Assert index table link rows (link_type = 1) exists in SYSTEM. 
CATALOG
+        assertSystemCatalogHasIndexTableLinks(null, schemaName, 
globalViewName);
+        assertSystemCatalogHasIndexTableLinks(tenantId, schemaName, 
tenantViewName);
+
+        // Assert index table header rows (table_type = 'i' AND INDEX_STATE IS 
NOT NULL) exists in SYSTEM. CATALOG
+        assertSystemCatalogHasIndexHdr(null, schemaName, globalIndexName);
+        assertSystemCatalogHasIndexHdr(tenantId, schemaName, tenantIndexName);
+
+        //////////////////////////////////
+        // TODO : fix the deadlock issue when partial index are present
+        //Create the SYSTEM.CATALOG index for Index Table links
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL);
+            stmt.execute(SYS_INDEX_HDR_TEST_INDEX_SQL);
+            conn.commit();
+        }
+        LOGGER.info("Finished creating index: " + 
SYS_INDEX_TABLE_LINK_TEST_INDEX_SQL);
+        LOGGER.info("Finished creating index: " + 
SYS_INDEX_HDR_TEST_INDEX_SQL);
+        ///////////////////////////////////
+
+        // Assert System Catalog index table has been created
+        assertSystemCatalogIndexTable(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, 
true);
+        assertSystemCatalogIndexTable(SYS_INDEX_HDR_TEST_INDEX_NAME, true);
+        // Assert appropriate rows are inserted in the SYSTEM.CATALOG index 
tables
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 null, schemaName, globalViewName,
+                true, globalIndexName);
+        
assertSystemCatalogIndexHaveIndexTableLinks(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME,
 tenantId, schemaName, tenantViewName,
+                true, tenantIndexName);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("ALTER TABLE SYSTEM.CATALOG ADD IF NOT EXISTS 
TEST_COL1 INTEGER DEFAULT 5 CASCADE INDEX ALL");
+        }
+
+        // Assert the new column exists in the meta index definition
+        assertAdditionalColumnInMetaIndexTable("SYSTEM", 
SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, "0:TEST_COL1", true);
+        assertAdditionalColumnInMetaIndexTable("SYSTEM", 
SYS_INDEX_HDR_TEST_INDEX_NAME, "0:TEST_COL1", true);
+
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("ALTER TABLE SYSTEM.CATALOG DROP COLUMN TEST_COL1");
+        }
+
+        // Assert the new column has been removed from the meta index 
definition
+        assertAdditionalColumnInMetaIndexTable("SYSTEM", 
SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, "0:TEST_COL1", false);
+        assertAdditionalColumnInMetaIndexTable("SYSTEM", 
SYS_INDEX_HDR_TEST_INDEX_NAME, "0:TEST_COL1", false);
+
+        dropSystemCatalogIndex(SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME);
+        dropSystemCatalogIndex(SYS_INDEX_HDR_TEST_INDEX_NAME);
+
+        // Assert System Catalog index table dropped
+        
assertSystemCatalogIndexTable(FULL_SYS_INDEX_TABLE_LINK_TEST_INDEX_NAME, false);
+        assertSystemCatalogIndexTable(FULL_SYS_INDEX_HDR_TEST_INDEX_NAME, 
false);
+
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000000..4d8cd10a9d
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/write/TestTrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.compat.hbase.CompatUtil;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+public class TestTrackingParallelWriterIndexCommitter extends 
TrackingParallelWriterIndexCommitter {
+    private static final Logger LOGGER =
+            
LoggerFactory.getLogger(TestTrackingParallelWriterIndexCommitter.class);
+
+    private HTableFactory testFactory;
+
+    public static class TestConnectionFactory extends 
ServerUtil.ConnectionFactory {
+
+        private static Map<ServerUtil.ConnectionType, Connection> connections =
+                new ConcurrentHashMap<ServerUtil.ConnectionType, Connection>();
+
+        public static Connection getConnection(final ServerUtil.ConnectionType 
connectionType, final RegionCoprocessorEnvironment env) {
+            final String key = String.format("%s-%s", env.getServerName(), 
connectionType.name().toLowerCase());
+            LOGGER.info("Connecting to {}", key);
+            return connections.computeIfAbsent(connectionType, new 
Function<ServerUtil.ConnectionType, Connection>() {
+                @Override
+                public Connection apply(ServerUtil.ConnectionType t) {
+                    try {
+                        return 
CompatUtil.createShortCircuitConnection(getTypeSpecificConfiguration(connectionType,
 env.getConfiguration()), env);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
+
+        public static void shutdown() {
+            synchronized (ServerUtil.ConnectionFactory.class) {
+                for (Connection connection : connections.values()) {
+                    try {
+                        LOGGER.info("Closing connection to {}", 
connection.getClusterId());
+                        connection.close();
+                    } catch (IOException e) {
+                        LOGGER.warn("Unable to close coprocessor connection", 
e);
+                    }
+                }
+                connections.clear();
+            }
+        }
+
+        public static int getConnectionsCount() {
+            return connections.size();
+        }
+
+    }
+
+
+    public static class TestCoprocessorHConnectionTableFactory extends 
IndexWriterUtils.CoprocessorHConnectionTableFactory {
+
+        @GuardedBy("TestCoprocessorHConnectionTableFactory.this")
+        private RegionCoprocessorEnvironment env;
+        private ServerUtil.ConnectionType connectionType;
+
+        TestCoprocessorHConnectionTableFactory(RegionCoprocessorEnvironment 
env,
+                ServerUtil.ConnectionType connectionType) {
+            super(env, connectionType);
+            this.env = env;
+            this.connectionType = connectionType;
+        }
+
+        @Override
+        public Connection getConnection() throws IOException {
+            return TestConnectionFactory.getConnection(connectionType, env);
+        }
+
+        @Override
+        public synchronized void shutdown() {
+            TestConnectionFactory.shutdown();
+        }
+    }
+
+    @Override
+    void setup(HTableFactory factory, ExecutorService pool, Stoppable stop,
+            RegionCoprocessorEnvironment env) {
+        LOGGER.info("Setting up TestCoprocessorHConnectionTableFactory ");
+        testFactory =
+                new TestCoprocessorHConnectionTableFactory(env,
+                        
ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION_WITH_CUSTOM_THREADS);
+        super.setup(testFactory, pool, stop, env);
+
+    }
+
+    @Override
+    public void stop(String why) {
+        LOGGER.info("Stopping TestTrackingParallelWriterIndexCommitter " + 
why);
+        testFactory.shutdown();
+        super.stop(why);
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
index a45f5d4497..691adb9775 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.jdbc;
 import org.apache.hadoop.hbase.*;
 import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl;
 import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import 
org.apache.phoenix.hbase.index.write.TestTrackingParallelWriterIndexCommitter;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.commons.lang3.RandomUtils;
@@ -55,6 +56,7 @@ import static org.apache.hadoop.hbase.ipc.RpcClient.*;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.waitFor;
 import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
+import static 
org.apache.phoenix.hbase.index.write.IndexWriter.INDEX_COMMITTER_CONF_KEY;
 import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT;
 import static 
org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR;
 import static org.apache.phoenix.jdbc.HighAvailabilityGroup.*;
@@ -702,6 +704,9 @@ public class HighAvailabilityTestingUtility {
             // Phoenix Region Server Endpoint needed for metadata caching
             conf.set(REGIONSERVER_COPROCESSOR_CONF_KEY,
                         PhoenixRegionServerEndpointTestImpl.class.getName());
+            conf.set(INDEX_COMMITTER_CONF_KEY,
+                    TestTrackingParallelWriterIndexCommitter.class.getName());
+
         }
     }
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 368a9c52a4..b5f6d67a80 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -23,11 +23,15 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
 import org.apache.phoenix.end2end.ConnectionQueryServicesTestImpl;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
@@ -35,7 +39,7 @@ import org.apache.phoenix.query.QueryServicesTestImpl;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 
-
+import static 
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * 
@@ -50,16 +54,15 @@ public class PhoenixTestDriver extends 
PhoenixEmbeddedDriver {
 
     private final ReadOnlyProps overrideProps;
     
-    @GuardedBy("this")
     private final QueryServices queryServices;
     
-    @GuardedBy("this")
-    private boolean closed = false;
-
-    @GuardedBy("this")
     private final Map<ConnectionInfo, ConnectionQueryServices>
             connectionQueryServicesMap = new HashMap<>();
 
+    @GuardedBy("closeLock")
+    private volatile boolean closed = false;
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
     public PhoenixTestDriver() {
         this(ReadOnlyProps.EMPTY_PROPS);
     }
@@ -71,9 +74,14 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver 
{
     }
 
     @Override
-    public synchronized QueryServices getQueryServices() {
-        checkClosed();
-        return queryServices;
+    public QueryServices getQueryServices() throws SQLException {
+        lockInterruptibly(PhoenixTestDriver.LockMode.READ);
+        try {
+            checkClosed();
+            return queryServices;
+        } finally {
+            unlock(PhoenixTestDriver.LockMode.READ);
+        }
     }
 
     @Override
@@ -83,43 +91,60 @@ public class PhoenixTestDriver extends 
PhoenixEmbeddedDriver {
     }
     
     @Override
-    public synchronized Connection connect(String url, Properties info) throws 
SQLException {
-        checkClosed();
-        return super.connect(url, info);
+    public Connection connect(String url, Properties info) throws SQLException 
{
+        lockInterruptibly(PhoenixTestDriver.LockMode.READ);
+        try {
+            checkClosed();
+            return super.connect(url, info);
+        } finally {
+            unlock(PhoenixTestDriver.LockMode.READ);
+        }
     }
     
     @Override // public for testing
-    public synchronized ConnectionQueryServices 
getConnectionQueryServices(String url, Properties infoIn) throws SQLException {
-        checkClosed();
-        final Properties info = PropertiesUtil.deepCopy(infoIn);
-        ConnectionInfo connInfo = ConnectionInfo.create(url, null, info);
-        ConnectionQueryServices connectionQueryServices = 
connectionQueryServicesMap.get(connInfo);
-        if (connectionQueryServices != null) {
+    public ConnectionQueryServices getConnectionQueryServices(String url, 
Properties infoIn) throws SQLException {
+        lockInterruptibly(PhoenixTestDriver.LockMode.READ);
+        try {
+            checkClosed();
+            final Properties info = PropertiesUtil.deepCopy(infoIn);
+            ConnectionInfo connInfo = ConnectionInfo.create(url, null, info);
+            ConnectionQueryServices connectionQueryServices = 
connectionQueryServicesMap.get(connInfo);
+            if (connectionQueryServices != null) {
+                return connectionQueryServices;
+            }
+            info.putAll(connInfo.asProps().asMap());
+            if (connInfo.isConnectionless()) {
+                connectionQueryServices = new 
ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
+            } else {
+                connectionQueryServices = new 
ConnectionQueryServicesTestImpl(queryServices, connInfo, info);
+            }
+            connectionQueryServices.init(url, info);
+            connectionQueryServicesMap.put(connInfo, connectionQueryServices);
             return connectionQueryServices;
+        } finally {
+            unlock(PhoenixTestDriver.LockMode.READ);
         }
-        info.putAll(connInfo.asProps().asMap());
-        if (connInfo.isConnectionless()) {
-            connectionQueryServices = new 
ConnectionlessQueryServicesImpl(queryServices, connInfo, info);
-        } else {
-            connectionQueryServices = new 
ConnectionQueryServicesTestImpl(queryServices, connInfo, info);
-        }
-        connectionQueryServices.init(url, info);
-        connectionQueryServicesMap.put(connInfo, connectionQueryServices);
-        return connectionQueryServices;
     }
-    
-    private synchronized void checkClosed() {
+
+    @GuardedBy("closeLock")
+    private void checkClosed() {
         if (closed) {
             throw new IllegalStateException("The Phoenix jdbc test driver has 
been closed.");
         }
     }
     
     @Override
-    public synchronized void close() throws SQLException {
-        if (closed) {
-            return;
+    public void close() throws SQLException {
+        lockInterruptibly(PhoenixTestDriver.LockMode.WRITE);
+
+        try {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        } finally {
+            unlock(PhoenixTestDriver.LockMode.WRITE);
         }
-        closed = true;
         try {
             for (ConnectionQueryServices cqs : 
connectionQueryServicesMap.values()) {
                 cqs.close();
@@ -134,4 +159,41 @@ public class PhoenixTestDriver extends 
PhoenixEmbeddedDriver {
             }
         }
     }
+    private enum LockMode {
+        READ, WRITE
+    };
+
+    private void lockInterruptibly(PhoenixTestDriver.LockMode mode) throws 
SQLException {
+        checkNotNull(mode);
+        switch (mode) {
+        case READ:
+            try {
+                closeLock.readLock().lockInterruptibly();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
+                        .setRootCause(e).build().buildException();
+            }
+            break;
+        case WRITE:
+            try {
+                closeLock.writeLock().lockInterruptibly();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
+                        .setRootCause(e).build().buildException();
+            }
+        }
+    }
+
+    private void unlock(PhoenixTestDriver.LockMode mode) {
+        checkNotNull(mode);
+        switch (mode) {
+        case READ:
+            closeLock.readLock().unlock();
+            break;
+        case WRITE:
+            closeLock.writeLock().unlock();
+        }
+    }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java
new file mode 100644
index 0000000000..e661acad6e
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/DecodeViewIndexIdFunctionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DecodeViewIndexIdFunctionTest {
+
+    @Test
+    public void testExpressionWithDecodeViewIndexIdFunction() throws Exception 
{
+        ParseNode parseNode = 
SQLParser.parseCondition("DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, 
VIEW_INDEX_ID_DATA_TYPE) = 32768");
+        boolean hasGetViewIndexIdParseNode = false;
+        for (ParseNode childNode : parseNode.getChildren()) {
+            if 
(childNode.getClass().isAssignableFrom(DecodeViewIndexIdParseNode.class)) {
+                assertEquals(2, childNode.getChildren().size());
+                hasGetViewIndexIdParseNode = true;
+            }
+        }
+        assertTrue(hasGetViewIndexIdParseNode);
+    }
+
+    @Test
+    public void testValidationForDecodeViewIndexIdFunction() throws Exception {
+        boolean hasGetViewIndexIdParseNode = false;
+        try {
+            ParseNode parseNode = 
SQLParser.parseCondition("DECODE_VIEW_INDEX_ID(VIEW_INDEX_ID, b) = 32768");
+            for (ParseNode childNode : parseNode.getChildren()) {
+                if 
(childNode.getClass().isAssignableFrom(DecodeViewIndexIdParseNode.class)) {
+                    assertEquals(2, childNode.getChildren().size());
+                    hasGetViewIndexIdParseNode = true;
+                }
+            }
+        } catch (Exception e) {
+            hasGetViewIndexIdParseNode = false;
+
+        }
+        assertFalse(hasGetViewIndexIdParseNode);
+    }
+
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7d5ebbb849..f9e68fefb4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -152,6 +152,7 @@ import 
org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
@@ -655,6 +656,9 @@ public abstract class BaseTest {
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+        conf.set(IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY,
+                
"org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec");
+
         // This results in processing one row at a time in each next operation 
of the aggregate region
         // scanner, i.e.,  one row pages. In other words, 0ms page allows only 
one row to be processed
         // within one page; 0ms page is equivalent to one-row page

Reply via email to