http://git-wip-us.apache.org/repos/asf/phoenix/blob/2fb019a2/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java.orig
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java.orig
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java.orig
new file mode 100644
index 0000000..65232ea
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java.orig
@@ -0,0 +1,1793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.schema.PTableType.INDEX;
+import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
+import static org.apache.phoenix.util.SchemaUtil.getVarChars;
+
+import java.io.IOException;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheForTableResponse;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.stat.PTableStats;
+import org.apache.phoenix.schema.stat.PTableStatsImpl;
+import org.apache.phoenix.schema.stat.StatisticsUtils;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * 
+ * Endpoint co-processor through which all Phoenix metadata mutations flow.
+ * We only allow mutations to the latest version of a Phoenix table (i.e. the
+ * timeStamp must be increasing).
+ * For adding/dropping columns use a sequence number on the table to ensure 
that
+ * the client has the latest version.
+ * The timeStamp on the table correlates with the timeStamp on the data row.
+ * TODO: we should enforce that a metadata mutation uses a timeStamp bigger 
than
+ * any in use on the data table, b/c otherwise we can end up with data rows 
that
+ * are not valid against a schema row.
+ *
+ * 
+ * @since 0.1
+ */
+@SuppressWarnings("deprecation")
+public class MetaDataEndpointImpl extends MetaDataProtocol implements 
CoprocessorService, Coprocessor {
+    private static final Logger logger = 
LoggerFactory.getLogger(MetaDataEndpointImpl.class);
+
+    // KeyValues for Table
+    private static final KeyValue TABLE_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TABLE_TYPE_BYTES);
+    private static final KeyValue TABLE_SEQ_NUM_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TABLE_SEQ_NUM_BYTES);
+    private static final KeyValue COLUMN_COUNT_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_COUNT_BYTES);
+    private static final KeyValue SALT_BUCKETS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
SALT_BUCKETS_BYTES);
+    private static final KeyValue PK_NAME_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
PK_NAME_BYTES);
+    private static final KeyValue DATA_TABLE_NAME_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DATA_TABLE_NAME_BYTES);
+    private static final KeyValue INDEX_STATE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
INDEX_STATE_BYTES);
+    private static final KeyValue IMMUTABLE_ROWS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IMMUTABLE_ROWS_BYTES);
+    private static final KeyValue VIEW_EXPRESSION_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
VIEW_STATEMENT_BYTES);
+    private static final KeyValue DEFAULT_COLUMN_FAMILY_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DEFAULT_COLUMN_FAMILY_NAME_BYTES);
+    private static final KeyValue DISABLE_WAL_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DISABLE_WAL_BYTES);
+    private static final KeyValue MULTI_TENANT_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
MULTI_TENANT_BYTES);
+    private static final KeyValue VIEW_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
VIEW_TYPE_BYTES);
+    private static final KeyValue VIEW_INDEX_ID_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
VIEW_INDEX_ID_BYTES);
+    private static final KeyValue INDEX_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
INDEX_TYPE_BYTES);
+    private static final KeyValue INDEX_DISABLE_TIMESTAMP_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
INDEX_DISABLE_TIMESTAMP_BYTES);
+    private static final List<KeyValue> TABLE_KV_COLUMNS = 
Arrays.<KeyValue>asList(
+            TABLE_TYPE_KV,
+            TABLE_SEQ_NUM_KV,
+            COLUMN_COUNT_KV,
+            SALT_BUCKETS_KV,
+            PK_NAME_KV,
+            DATA_TABLE_NAME_KV,
+            INDEX_STATE_KV,
+            IMMUTABLE_ROWS_KV,
+            VIEW_EXPRESSION_KV,
+            DEFAULT_COLUMN_FAMILY_KV,
+            DISABLE_WAL_KV,
+            MULTI_TENANT_KV,
+            VIEW_TYPE_KV,
+            VIEW_INDEX_ID_KV,
+            INDEX_TYPE_KV,
+            INDEX_DISABLE_TIMESTAMP_KV
+            );
+    static {
+        Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    private static final int TABLE_TYPE_INDEX = 
TABLE_KV_COLUMNS.indexOf(TABLE_TYPE_KV);
+    private static final int TABLE_SEQ_NUM_INDEX = 
TABLE_KV_COLUMNS.indexOf(TABLE_SEQ_NUM_KV);
+    private static final int COLUMN_COUNT_INDEX = 
TABLE_KV_COLUMNS.indexOf(COLUMN_COUNT_KV);
+    private static final int SALT_BUCKETS_INDEX = 
TABLE_KV_COLUMNS.indexOf(SALT_BUCKETS_KV);
+    private static final int PK_NAME_INDEX = 
TABLE_KV_COLUMNS.indexOf(PK_NAME_KV);
+    private static final int DATA_TABLE_NAME_INDEX = 
TABLE_KV_COLUMNS.indexOf(DATA_TABLE_NAME_KV);
+    private static final int INDEX_STATE_INDEX = 
TABLE_KV_COLUMNS.indexOf(INDEX_STATE_KV);
+    private static final int IMMUTABLE_ROWS_INDEX = 
TABLE_KV_COLUMNS.indexOf(IMMUTABLE_ROWS_KV);
+    private static final int VIEW_STATEMENT_INDEX = 
TABLE_KV_COLUMNS.indexOf(VIEW_EXPRESSION_KV);
+    private static final int DEFAULT_COLUMN_FAMILY_INDEX = 
TABLE_KV_COLUMNS.indexOf(DEFAULT_COLUMN_FAMILY_KV);
+    private static final int DISABLE_WAL_INDEX = 
TABLE_KV_COLUMNS.indexOf(DISABLE_WAL_KV);
+    private static final int MULTI_TENANT_INDEX = 
TABLE_KV_COLUMNS.indexOf(MULTI_TENANT_KV);
+    private static final int VIEW_TYPE_INDEX = 
TABLE_KV_COLUMNS.indexOf(VIEW_TYPE_KV);
+    private static final int VIEW_INDEX_ID_INDEX = 
TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
+    private static final int INDEX_TYPE_INDEX = 
TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
+    
+    // KeyValues for Column
+    private static final KeyValue DECIMAL_DIGITS_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
+    private static final KeyValue COLUMN_SIZE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_SIZE_BYTES);
+    private static final KeyValue NULLABLE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
NULLABLE_BYTES);
+    private static final KeyValue DATA_TYPE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DATA_TYPE_BYTES);
+    private static final KeyValue ORDINAL_POSITION_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ORDINAL_POSITION_BYTES);
+    private static final KeyValue SORT_ORDER_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
SORT_ORDER_BYTES);
+    private static final KeyValue ARRAY_SIZE_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ARRAY_SIZE_BYTES);
+    private static final KeyValue VIEW_CONSTANT_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
VIEW_CONSTANT_BYTES);
+    private static final KeyValue IS_VIEW_REFERENCED_KV = 
KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
IS_VIEW_REFERENCED_BYTES);
+    private static final List<KeyValue> COLUMN_KV_COLUMNS = 
Arrays.<KeyValue>asList(
+            DECIMAL_DIGITS_KV,
+            COLUMN_SIZE_KV,
+            NULLABLE_KV,
+            DATA_TYPE_KV,
+            ORDINAL_POSITION_KV,
+            SORT_ORDER_KV,
+            DATA_TABLE_NAME_KV, // included in both column and table row for 
metadata APIs
+            ARRAY_SIZE_KV,
+            VIEW_CONSTANT_KV,
+            IS_VIEW_REFERENCED_KV
+            );
+    static {
+        Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
+    }
+    private static final int DECIMAL_DIGITS_INDEX = 
COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
+    private static final int COLUMN_SIZE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
+    private static final int NULLABLE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
+    private static final int DATA_TYPE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(DATA_TYPE_KV);
+    private static final int ORDINAL_POSITION_INDEX = 
COLUMN_KV_COLUMNS.indexOf(ORDINAL_POSITION_KV);
+    private static final int SORT_ORDER_INDEX = 
COLUMN_KV_COLUMNS.indexOf(SORT_ORDER_KV);
+    private static final int ARRAY_SIZE_INDEX = 
COLUMN_KV_COLUMNS.indexOf(ARRAY_SIZE_KV);
+    private static final int VIEW_CONSTANT_INDEX = 
COLUMN_KV_COLUMNS.indexOf(VIEW_CONSTANT_KV);
+    private static final int IS_VIEW_REFERENCED_INDEX = 
COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
+    
+    private static final int LINK_TYPE_INDEX = 0;
+
+    private static PName newPName(byte[] keyBuffer, int keyOffset, int 
keyLength) {
+        if (keyLength <= 0) {
+            return null;
+        }
+        int length = getVarCharLength(keyBuffer, keyOffset, keyLength);
+        return PNameFactory.newName(keyBuffer, keyOffset, length);
+    }
+
+    private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long 
stopTimeStamp)
+            throws IOException {
+        Scan scan = new Scan();
+        scan.setTimeRange(startTimeStamp, stopTimeStamp);
+        scan.setStartRow(key);
+        byte[] stopKey = ByteUtil.concat(key, 
QueryConstants.SEPARATOR_BYTE_ARRAY);
+        ByteUtil.nextKey(stopKey, stopKey.length);
+        scan.setStopRow(stopKey);
+        return scan;
+    }
+
+    private RegionCoprocessorEnvironment env;
+         
+    private static final Log LOG = 
LogFactory.getLog(MetaDataEndpointImpl.class);
+
+    /**
+     * Stores a reference to the coprocessor environment provided by the
+     * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from 
the region where this
+     * coprocessor is loaded. Since this is a coprocessor endpoint, it always 
expects to be loaded
+     * on a table region, so always expects this to be an instance of
+     * {@link RegionCoprocessorEnvironment}.
+     * @param env the environment provided by the coprocessor host
+     * @throws IOException if the provided environment is not an instance of
+     *             {@code RegionCoprocessorEnvironment}
+     */
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            this.env = (RegionCoprocessorEnvironment) env;
+        } else {
+            throw new CoprocessorException("Must be loaded on a table 
region!");
+        }
+        LOG.info("Starting Tracing-Metrics Systems");
+        // Start the phoenix trace collection
+        Tracing.addTraceMetricsSource();
+        Metrics.ensureConfigured();
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        // nothing to do
+    }
+
+    @Override
+    public Service getService() {
+        return this;
+    }
+
+    @Override
+    public void getTable(RpcController controller, GetTableRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[] tenantId = request.getTenantId().toByteArray();
+        byte[] schemaName = request.getSchemaName().toByteArray();
+        byte[] tableName = request.getTableName().toByteArray();
+        byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+        long tableTimeStamp = request.getTableTimestamp();
+
+        try {
+            // TODO: check that key is within region.getStartKey() and 
region.getEndKey()
+            // and return special code to force client to lookup region from 
meta.
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+
+            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+            PTable table = doGetTable(key, request.getClientTimestamp());
+            if (table == null) {
+                
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                builder.setMutationTime(currentTime);
+                done.run(builder.build());
+                return;
+            }
+            
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+            builder.setMutationTime(currentTime);
+            
+            if (table.getTimeStamp() != tableTimeStamp) {
+                builder.setTable(PTableImpl.toProto(table));
+            }
+            done.run(builder.build());
+            return;
+        } catch (Throwable t) {
+               logger.error("getTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), 
t));
+        }
+    }
+
+    private PTable doGetTable(byte[] key, long clientTimeStamp) throws 
IOException, SQLException {
+        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+        Cache<ImmutableBytesPtr, PTable> metaDataCache =
+                GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = metaDataCache.getIfPresent(cacheKey);
+        // We only cache the latest, so we'll end up building the table with 
every call if the
+        // client connection has specified an SCN.
+        // TODO: If we indicate to the client that we're returning an older 
version, but there's a
+        // newer version available, the client
+        // can safely not call this, since we only allow modifications to the 
latest.
+        if (table != null && table.getTimeStamp() < clientTimeStamp) {
+            // Table on client is up-to-date with table on server, so just 
return
+            if (isTableDeleted(table)) {
+                return null;
+            }
+            return table;
+        }
+        // Ask Lars about the expense of this call - if we don't take the 
lock, we still won't get
+        // partial results
+        // get the co-processor environment
+        // TODO: check that key is within region.getStartKey() and 
region.getEndKey()
+        // and return special code to force client to lookup region from meta.
+        HRegion region = env.getRegion();
+        /*
+         * Lock directly on key, though it may be an index table. This will 
just prevent a table
+         * from getting rebuilt too often.
+         */
+        RowLock rowLock = region.getRowLock(key);
+        if (rowLock == null) {
+            throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
+        }
+        try {
+            // Try cache again in case we were waiting on a lock
+            table = metaDataCache.getIfPresent(cacheKey);
+            // We only cache the latest, so we'll end up building the table 
with every call if the
+            // client connection has specified an SCN.
+            // TODO: If we indicate to the client that we're returning an 
older version, but there's
+            // a newer version available, the client
+            // can safely not call this, since we only allow modifications to 
the latest.
+            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                // Table on client is up-to-date with table on server, so just 
return
+                if (isTableDeleted(table)) {
+                    return null;
+                }
+                return table;
+            }
+            // Query for the latest table first, since it's not cached
+            table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP);
+            if (table != null && table.getTimeStamp() < clientTimeStamp) {
+                return table;
+            }
+            // Otherwise, query for an older version of the table - it won't 
be cached
+            return buildTable(key, cacheKey, region, clientTimeStamp);
+        } finally {
+            rowLock.release();
+        }
+    }
+
+    private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion 
region,
+            long clientTimeStamp) throws IOException, SQLException {
+        Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
+        RegionScanner scanner = region.getScanner(scan);
+
+        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        try {
+            PTable oldTable = metaDataCache.getIfPresent(cacheKey);
+            long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : 
oldTable.getTimeStamp();
+            PTable newTable;
+            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
+            if (newTable == null) {
+                return null;
+            }
+            if (oldTable == null || tableTimeStamp < newTable.getTimeStamp()) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Caching table "
+                            + Bytes.toStringBinary(cacheKey.get(), 
cacheKey.getOffset(),
+                                cacheKey.getLength()) + " at seqNum "
+                            + newTable.getSequenceNumber() + " with newer 
timestamp "
+                            + newTable.getTimeStamp() + " versus " + 
tableTimeStamp);
+                }
+                metaDataCache.put(cacheKey, newTable);
+            }
+            return newTable;
+        } finally {
+            scanner.close();
+        }
+    }
+
+    private void addIndexToTable(PName tenantId, PName schemaName, PName 
indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws 
IOException, SQLException {
+        byte[] key = SchemaUtil.getTableKey(tenantId == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), 
indexName.getBytes());
+        PTable indexTable = doGetTable(key, clientTimeStamp);
+        if (indexTable == null) {
+            ServerUtil.throwIOException("Index not found", new 
TableNotFoundException(schemaName.getString(), indexName.getString()));
+            return;
+        }
+        indexes.add(indexTable);
+    }
+
+    private void addColumnToTable(List<Cell> results, PName colName, PName 
famName,
+        Cell[] colKeyValues, List<PColumn> columns, boolean isSalted) {
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < COLUMN_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = COLUMN_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), 
searchKv.getQualifierLength());
+            if (cmp == 0) {
+                colKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                colKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table 
column row
+            }
+        }
+
+        if (colKeyValues[DATA_TYPE_INDEX] == null || 
colKeyValues[NULLABLE_INDEX] == null
+                || colKeyValues[ORDINAL_POSITION_INDEX] == null) {
+            throw new IllegalStateException("Didn't find all required key 
values in '"
+                    + colName.getString() + "' column metadata row");
+        }
+
+        Cell columnSizeKv = colKeyValues[COLUMN_SIZE_INDEX];
+        Integer maxLength =
+                columnSizeKv == null ? null : 
PDataType.INTEGER.getCodec().decodeInt(
+                    columnSizeKv.getValueArray(), 
columnSizeKv.getValueOffset(), SortOrder.getDefault());
+        Cell decimalDigitKv = colKeyValues[DECIMAL_DIGITS_INDEX];
+        Integer scale =
+                decimalDigitKv == null ? null : 
PDataType.INTEGER.getCodec().decodeInt(
+                    decimalDigitKv.getValueArray(), 
decimalDigitKv.getValueOffset(), SortOrder.getDefault());
+        Cell ordinalPositionKv = colKeyValues[ORDINAL_POSITION_INDEX];
+        int position =
+                
PDataType.INTEGER.getCodec().decodeInt(ordinalPositionKv.getValueArray(),
+                    ordinalPositionKv.getValueOffset(), 
SortOrder.getDefault()) + (isSalted ? 1 : 0);
+        Cell nullableKv = colKeyValues[NULLABLE_INDEX];
+        boolean isNullable =
+                
PDataType.INTEGER.getCodec().decodeInt(nullableKv.getValueArray(),
+                    nullableKv.getValueOffset(), SortOrder.getDefault()) != 
ResultSetMetaData.columnNoNulls;
+        Cell dataTypeKv = colKeyValues[DATA_TYPE_INDEX];
+        PDataType dataType =
+                PDataType.fromTypeId(PDataType.INTEGER.getCodec().decodeInt(
+                  dataTypeKv.getValueArray(), dataTypeKv.getValueOffset(), 
SortOrder.getDefault()));
+        if (maxLength == null && dataType == PDataType.BINARY) dataType = 
PDataType.VARBINARY; // For
+                                                                               
                // backward
+                                                                               
                // compatibility.
+        Cell sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
+        SortOrder sortOrder =
+                       sortOrderKv == null ? SortOrder.getDefault() : 
SortOrder.fromSystemValue(PDataType.INTEGER
+                        .getCodec().decodeInt(sortOrderKv.getValueArray(),
+                                       sortOrderKv.getValueOffset(), 
SortOrder.getDefault()));
+        
+        Cell arraySizeKv = colKeyValues[ARRAY_SIZE_INDEX];
+        Integer arraySize = arraySizeKv == null ? null : 
+          PDataType.INTEGER.getCodec().decodeInt(arraySizeKv.getValueArray(), 
arraySizeKv.getValueOffset(), SortOrder.getDefault());
+ 
+        Cell viewConstantKv = colKeyValues[VIEW_CONSTANT_INDEX];
+        byte[] viewConstant = viewConstantKv == null ? null : 
viewConstantKv.getValue();
+        Cell isViewReferencedKv = colKeyValues[IS_VIEW_REFERENCED_INDEX];
+        boolean isViewReferenced = isViewReferencedKv != null && 
Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(isViewReferencedKv.getValueArray(),
 isViewReferencedKv.getValueOffset(), isViewReferencedKv.getValueLength()));
+        PColumn column = new PColumnImpl(colName, famName, dataType, 
maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, 
isViewReferenced);
+        columns.add(column);
+    }
+    
+    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long 
tableTimeStamp)
+        throws IOException, SQLException {
+        List<Cell> results = Lists.newArrayList();
+        scanner.next(results);
+        if (results.isEmpty()) {
+            return null;
+        }
+        Cell[] tableKeyValues = new Cell[TABLE_KV_COLUMNS.size()];
+        Cell[] colKeyValues = new Cell[COLUMN_KV_COLUMNS.size()];
+    
+        // Create PTable based on KeyValues from scan
+        Cell keyValue = results.get(0);
+        byte[] keyBuffer = keyValue.getRowArray();
+        int keyLength = keyValue.getRowLength();
+        int keyOffset = keyValue.getRowOffset();
+        PName tenantId = newPName(keyBuffer, keyOffset, keyLength);
+        int tenantIdLength = (tenantId == null) ? 0 : 
tenantId.getBytes().length;
+        if (tenantIdLength == 0) {
+            tenantId = null;
+        }
+        PName schemaName = newPName(keyBuffer, keyOffset+tenantIdLength+1, 
keyLength);
+        int schemaNameLength = schemaName.getBytes().length;
+        int tableNameLength = keyLength - schemaNameLength - 1 - 
tenantIdLength - 1;
+        byte[] tableNameBytes = new byte[tableNameLength];
+        System.arraycopy(keyBuffer, keyOffset + schemaNameLength + 1 + 
tenantIdLength + 1,
+            tableNameBytes, 0, tableNameLength);
+        PName tableName = PNameFactory.newName(tableNameBytes);
+    
+        int offset = tenantIdLength + schemaNameLength + tableNameLength + 3;
+        // This will prevent the client from continually looking for the 
current
+        // table when we know that there will never be one since we disallow 
updates
+        // unless the table is the latest
+        // If we already have a table newer than the one we just found and
+        // the client timestamp is less that the existing table time stamp,
+        // bump up the timeStamp to right before the client time stamp, since
+        // we know it can't possibly change.
+        long timeStamp = keyValue.getTimestamp();
+        // long timeStamp = tableTimeStamp > keyValue.getTimestamp() &&
+        // clientTimeStamp < tableTimeStamp
+        // ? clientTimeStamp-1
+        // : keyValue.getTimestamp();
+    
+        int i = 0;
+        int j = 0;
+        while (i < results.size() && j < TABLE_KV_COLUMNS.size()) {
+            Cell kv = results.get(i);
+            Cell searchKv = TABLE_KV_COLUMNS.get(j);
+            int cmp =
+                    Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(),
+                        kv.getQualifierLength(), searchKv.getQualifierArray(),
+                        searchKv.getQualifierOffset(), 
searchKv.getQualifierLength());
+            if (cmp == 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); // Find 
max timestamp of table
+                                                                    // header 
row
+                tableKeyValues[j++] = kv;
+                i++;
+            } else if (cmp > 0) {
+                timeStamp = Math.max(timeStamp, kv.getTimestamp()); 
+                tableKeyValues[j++] = null;
+            } else {
+                i++; // shouldn't happen - means unexpected KV in system table 
header row
+            }
+        }
+        // TABLE_TYPE, TABLE_SEQ_NUM and COLUMN_COUNT are required.
+        if (tableKeyValues[TABLE_TYPE_INDEX] == null || 
tableKeyValues[TABLE_SEQ_NUM_INDEX] == null
+                || tableKeyValues[COLUMN_COUNT_INDEX] == null) {
+            throw new IllegalStateException(
+                    "Didn't find expected key values for table row in metadata 
row");
+        }
+
+        Cell tableTypeKv = tableKeyValues[TABLE_TYPE_INDEX];
+        PTableType tableType =
+                PTableType
+                        
.fromSerializedValue(tableTypeKv.getValueArray()[tableTypeKv.getValueOffset()]);
+        Cell tableSeqNumKv = tableKeyValues[TABLE_SEQ_NUM_INDEX];
+        long tableSeqNum =
+                
PDataType.LONG.getCodec().decodeLong(tableSeqNumKv.getValueArray(),
+                    tableSeqNumKv.getValueOffset(), SortOrder.getDefault());
+        Cell columnCountKv = tableKeyValues[COLUMN_COUNT_INDEX];
+        int columnCount =
+                
PDataType.INTEGER.getCodec().decodeInt(columnCountKv.getValueArray(),
+                    columnCountKv.getValueOffset(), SortOrder.getDefault());
+        Cell pkNameKv = tableKeyValues[PK_NAME_INDEX];
+        PName pkName =
+                pkNameKv != null ? newPName(pkNameKv.getValueArray(), 
pkNameKv.getValueOffset(),
+                    pkNameKv.getValueLength()) : null;
+        Cell saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
+        Integer saltBucketNum =
+                saltBucketNumKv != null ? (Integer) 
PDataType.INTEGER.getCodec().decodeInt(
+                    saltBucketNumKv.getValueArray(), 
saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
+        Cell dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
+        PName dataTableName =
+                dataTableNameKv != null ? 
newPName(dataTableNameKv.getValueArray(),
+                    dataTableNameKv.getValueOffset(), 
dataTableNameKv.getValueLength()) : null;
+        Cell indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
+        PIndexState indexState =
+                indexStateKv == null ? null : 
PIndexState.fromSerializedValue(indexStateKv
+                        .getValueArray()[indexStateKv.getValueOffset()]);
+        Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
+        boolean isImmutableRows =
+                immutableRowsKv == null ? false : (Boolean) 
PDataType.BOOLEAN.toObject(
+                    immutableRowsKv.getValueArray(), 
immutableRowsKv.getValueOffset(),
+                    immutableRowsKv.getValueLength());
+        Cell defaultFamilyNameKv = tableKeyValues[DEFAULT_COLUMN_FAMILY_INDEX];
+        PName defaultFamilyName = defaultFamilyNameKv != null ? 
newPName(defaultFamilyNameKv.getValueArray(), 
defaultFamilyNameKv.getValueOffset(), defaultFamilyNameKv.getValueLength()) : 
null;
+        Cell viewStatementKv = tableKeyValues[VIEW_STATEMENT_INDEX];
+        String viewStatement = viewStatementKv != null ? 
(String)PDataType.VARCHAR.toObject(viewStatementKv.getValueArray(), 
viewStatementKv.getValueOffset(), viewStatementKv.getValueLength()) : null;
+        Cell disableWALKv = tableKeyValues[DISABLE_WAL_INDEX];
+        boolean disableWAL = disableWALKv == null ? PTable.DEFAULT_DISABLE_WAL 
: Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(disableWALKv.getValueArray(), 
disableWALKv.getValueOffset(), disableWALKv.getValueLength()));
+        Cell multiTenantKv = tableKeyValues[MULTI_TENANT_INDEX];
+        boolean multiTenant = multiTenantKv == null ? false : 
Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(multiTenantKv.getValueArray(), 
multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
+        Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
+        ViewType viewType = viewTypeKv == null ? null : 
ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
+        Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
+        Short viewIndexId = viewIndexIdKv == null ? null : 
(Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(),
 viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
+        Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
+        IndexType indexType = indexTypeKv == null ? null : 
IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
+        
+        List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(columnCount);
+        List<PTable> indexes = new ArrayList<PTable>();
+        List<PName> physicalTables = new ArrayList<PName>();
+        while (true) {
+          results.clear();
+          scanner.next(results);
+          if (results.isEmpty()) {
+              break;
+          }
+          Cell colKv = results.get(LINK_TYPE_INDEX);
+          int colKeyLength = colKv.getRowLength();
+          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
offset, colKeyLength-offset);
+          int colKeyOffset = offset + colName.getBytes().length + 1;
+          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + 
colKeyOffset, colKeyLength-colKeyOffset);
+          if (colName.getString().isEmpty() && famName != null) {
+              LinkType linkType = 
LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+              if (linkType == LinkType.INDEX_TABLE) {
+                  addIndexToTable(tenantId, schemaName, famName, tableName, 
clientTimeStamp, indexes);
+              } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                  physicalTables.add(famName);
+              } else {
+                  logger.warn("Unknown link type: " + 
colKv.getValueArray()[colKv.getValueOffset()] + " for " + 
SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
+              }
+          } else {
+              addColumnToTable(results, colName, famName, colKeyValues, 
columns, saltBucketNum != null);
+          }
+        }
+        PName physicalTableName = physicalTables.isEmpty() ? 
PNameFactory.newName(SchemaUtil.getTableName(
+                schemaName.getString(), tableName.getString())) : 
physicalTables.get(0);
+        PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), 
columns);
+        return PTableImpl.makePTable(tenantId, schemaName, tableName, 
tableType, indexState, timeStamp, 
+            tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? 
dataTableName : null, 
+            indexes, isImmutableRows, physicalTables, defaultFamilyName, 
viewStatement, disableWAL, 
+            multiTenant, viewType, viewIndexId, indexType, stats);
+    }
+
+    private PTableStats updateStatsInternal(byte[] tableNameBytes, 
List<PColumn> columns)
+            throws IOException {
+        List<PName> family = 
Lists.newArrayListWithExpectedSize(columns.size());
+        for (PColumn column : columns) {
+            PName familyName = column.getFamilyName();
+            if (familyName != null) {
+                family.add(familyName);
+            }
+        }
+        HTable statsHTable = null;
+        try {
+            // Can we do a new HTable instance here? Or get it from a pool or 
cache of these instances?
+            statsHTable = new HTable(this.env.getConfiguration(),
+                    PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES);
+            Scan s = new Scan();
+            if (tableNameBytes != null) {
+                // Check for an efficient way here
+                s.setStartRow(tableNameBytes);
+                s.setStopRow(ByteUtil.nextKey(tableNameBytes));
+            }
+            ResultScanner scanner = statsHTable.getScanner(s);
+            Result result = null;
+            byte[] fam = null;
+            List<byte[]> guidePosts = 
Lists.newArrayListWithExpectedSize(columns.size());
+            TreeMap<byte[], List<byte[]>> guidePostsPerCf = new 
TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR);
+            while ((result = scanner.next()) != null) {
+                CellScanner cellScanner = result.cellScanner();
+                while (cellScanner.advance()) {
+                    Cell current = cellScanner.current();
+                    // For now collect only guide posts
+                    if (Bytes.equals(current.getQualifierArray(), 
current.getQualifierOffset(),
+                            current.getQualifierLength(), 
PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, 0,
+                            PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES.length)) 
{
+                        byte[] cfInCell = 
StatisticsUtils.getCFFromRowKey(tableNameBytes, current.getRowArray(),
+                                current.getRowOffset(), 
current.getRowLength());
+                        if (fam == null) {
+                            fam = cfInCell;
+                        } else if (!Bytes.equals(fam, cfInCell)) {
+                            // Sort all the guide posts
+                            guidePostsPerCf.put(cfInCell, guidePosts);
+                            guidePosts = new ArrayList<byte[]>();
+                            fam = cfInCell;
+                        }
+                        byte[] guidePostVal = new 
ImmutableBytesPtr(current.getValueArray(), current.getValueOffset(), current
+                                .getValueLength()).copyBytesIfNecessary();
+                        PhoenixArray array = 
(PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal);
+                        if (array != null && array.getDimensions() != 0) {
+                            for (int j = 0; j < array.getDimensions(); j++) {
+                                byte[] gp = array.toBytes(j);
+                                if (gp.length != 0) {
+                                    guidePosts.add(gp);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            if(fam != null) {
+                // Sort all the guideposts
+                guidePostsPerCf.put(fam, guidePosts);
+            }
+            return new PTableStatsImpl(guidePostsPerCf);
+        } catch (Exception e) {
+            if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) {
+                logger.warn("Stats table not yet online", e);
+            } else {
+                throw new IOException(e);
+            }
+        } finally {
+            if (statsHTable != null) {
+                statsHTable.close();
+            }
+        }
+        return PTableStatsImpl.NO_STATS;
+    }
+
+    private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, 
HRegion region,
+        long clientTimeStamp) throws IOException {
+        if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+            return null;
+        }
+    
+        Scan scan = newTableRowsScan(key, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP);
+        scan.setFilter(new FirstKeyOnlyFilter());
+        scan.setRaw(true);
+        RegionScanner scanner = region.getScanner(scan);
+        List<Cell> results = Lists.<Cell> newArrayList();
+        scanner.next(results);
+        // HBase ignores the time range on a raw scan (HBASE-7362)
+        if (!results.isEmpty() && results.get(0).getTimestamp() > 
clientTimeStamp) {
+            Cell kv = results.get(0);
+            if (kv.getTypeByte() == Type.Delete.getCode()) {
+                Cache<ImmutableBytesPtr, PTable> metaDataCache =
+                        GlobalCache.getInstance(this.env).getMetaDataCache();
+                PTable table = newDeletedTableMarker(kv.getTimestamp());
+                metaDataCache.put(cacheKey, table);
+                return table;
+            }
+        }
+        return null;
+    }
+
+    private static PTable newDeletedTableMarker(long timestamp) {
+        return new PTableImpl(timestamp);
+    }
+
+    private static boolean isTableDeleted(PTable table) {
+        return table.getName() == null;
+    }
+
+    private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
+        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+        throws IOException, SQLException {
+        HRegion region = env.getRegion();
+        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = metaDataCache.getIfPresent(cacheKey);
+        // We always cache the latest version - fault in if not in cache
+        if (table != null || (table = buildTable(key, cacheKey, region, 
asOfTimeStamp)) != null) {
+            return table;
+        }
+        // if not found then check if newer table already exists and add 
delete marker for timestamp
+        // found
+        if (table == null
+                && (table = buildDeletedTable(key, cacheKey, region, 
clientTimeStamp)) != null) {
+            return table;
+        }
+        return null;
+    }
+    
+    
+    @Override
+    public void createTable(RpcController controller, CreateTableRequest 
request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        byte[][] rowKeyMetaData = new byte[3][];
+        byte[] schemaName = null;
+        byte[] tableName = null;
+
+        try {
+            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
+            byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            schemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            tableName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            byte[] parentTableName = 
MetaDataUtil.getParentTableName(tableMetadata);
+            byte[] lockTableName = parentTableName == null ? tableName : 
parentTableName;
+            byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, 
lockTableName);
+            byte[] key =
+                    parentTableName == null ? lockKey : 
SchemaUtil.getTableKey(tenantIdBytes,
+                        schemaName, tableName);
+            byte[] parentKey = parentTableName == null ? null : lockKey;
+
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkTableKeyInRegion(lockKey, 
region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+            try {
+                acquireLock(region, lockKey, locks);
+                if (key != lockKey) {
+                    acquireLock(region, key, locks);
+                }
+                // Load parent table first
+                PTable parentTable = null;
+                ImmutableBytesPtr parentCacheKey = null;
+                if (parentKey != null) {
+                    parentCacheKey = new ImmutableBytesPtr(parentKey);
+                    parentTable =
+                            loadTable(env, parentKey, parentCacheKey, 
clientTimeStamp,
+                                clientTimeStamp);
+                    if (parentTable == null || isTableDeleted(parentTable)) {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(parentTable));
+                        done.run(builder.build());
+                        return;
+                    }
+                    // If parent table isn't at the expected sequence number, 
then return
+                    if (parentTable.getSequenceNumber() != MetaDataUtil
+                            .getParentSequenceNumber(tableMetadata)) {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.CONCURRENT_TABLE_MUTATION);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(parentTable));
+                        done.run(builder.build());
+                        return;
+                    }
+                }
+                // Load child table next
+                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+                // Get as of latest timestamp so we can detect if we have a 
newer table that already
+                // exists
+                // without making an additional query
+                PTable table =
+                        loadTable(env, key, cacheKey, clientTimeStamp, 
HConstants.LATEST_TIMESTAMP);
+                if (table != null) {
+                    if (table.getTimeStamp() < clientTimeStamp) {
+                        // If the table is older than the client time stamp 
and it's deleted,
+                        // continue
+                        if (!isTableDeleted(table)) {
+                            
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
+                            
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            builder.setTable(PTableImpl.toProto(table));
+                            done.run(builder.build());
+                            return;
+                        }
+                    } else {
+                        
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
+                        
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                        builder.setTable(PTableImpl.toProto(table));
+                        done.run(builder.build());
+                        return;
+                    }
+                }
+                // TODO: Switch this to HRegion#batchMutate when we want to 
support indexes on the
+                // system
+                // table. Basically, we get all the locks that we don't 
already hold for all the
+                // tableMetadata rows. This ensures we don't have deadlock 
situations (ensuring
+                // primary and
+                // then index table locks are held, in that order). For now, 
we just don't support
+                // indexing
+                // on the system table. This is an issue because of the way we 
manage batch mutation
+                // in the
+                // Indexer.
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
+
+                // Invalidate the cache - the next getTable call will add it
+                // TODO: consider loading the table that was just created 
here, patching up the parent table, and updating the cache
+                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                if (parentCacheKey != null) {
+                    metaDataCache.invalidate(parentCacheKey);
+                }
+                metaDataCache.invalidate(cacheKey);
+                // Get timeStamp from mutations - the above method sets it if 
it's unset
+                long currentTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
+                builder.setMutationTime(currentTimeStamp);
+                done.run(builder.build());
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("createTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), 
t));
+        }
+    }
+
+
+    private static void acquireLock(HRegion region, byte[] key, List<RowLock> 
locks)
+        throws IOException {
+        RowLock rowLock = region.getRowLock(key);
+        if (rowLock == null) {
+            throw new IOException("Failed to acquire lock on " + 
Bytes.toStringBinary(key));
+        }
+        locks.add(rowLock);
+    }
+    
+    protected static final byte[] PHYSICAL_TABLE_BYTES = new byte[] 
{PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()};
+    /**
+     * @param tableName parent table's name
+     * Looks for whether child views exist for the table specified by table.
+     * TODO: should we pass a timestamp here?
+     */
+    private TableViewFinderResult findChildViews(HRegion region, byte[] 
tenantId, PTable table) throws IOException {
+        byte[] schemaName = table.getSchemaName().getBytes();
+        byte[] tableName = table.getTableName().getBytes();
+        boolean isMultiTenant = table.isMultiTenant();
+        Scan scan = new Scan();
+        // If the table is multi-tenant, we need to check across all 
tenant_ids,
+        // so we can't constrain the row key. Otherwise, any views would have
+        // the same tenantId.
+        if (!isMultiTenant) {
+            byte[] startRow = ByteUtil.concat(tenantId, 
QueryConstants.SEPARATOR_BYTE_ARRAY);
+            byte[] stopRow = ByteUtil.nextKey(startRow);
+            scan.setStartRow(startRow);
+            scan.setStopRow(stopRow);
+        }
+        SingleColumnValueFilter linkFilter = new 
SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, 
PHYSICAL_TABLE_BYTES);
+        linkFilter.setFilterIfMissing(true);
+        byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, 
SchemaUtil.getTableNameAsBytes(schemaName, tableName));
+        SuffixFilter rowFilter = new SuffixFilter(suffix);
+        Filter filter = new FilterList(linkFilter, rowFilter);
+        scan.setFilter(filter);
+        scan.addColumn(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+        // Original region-only scanner modified due to PHOENIX-1208
+        // RegionScanner scanner = region.getScanner(scan);
+        // The following *should* work, but doesn't due to HBASE-11837
+        // TableName systemCatalogTableName = 
region.getTableDesc().getTableName();
+        // HTableInterface hTable = env.getTable(systemCatalogTableName);
+        // These deprecated calls work around the issue
+        HTablePool pool = new HTablePool (env.getConfiguration(),1);
+        try {
+            HTableInterface hTable = 
pool.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            ResultScanner scanner = hTable.getScanner(scan);
+            boolean allViewsInCurrentRegion = true;
+            int numOfChildViews = 0;
+            List<Result> results = Lists.newArrayList();
+            try {
+                for (Result result = scanner.next(); (result != null); result 
= scanner.next()) {
+                    numOfChildViews++;
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ResultTuple resultTuple = new ResultTuple(result);
+                    resultTuple.getKey(ptr);
+                    byte[] key = ptr.copyBytes();
+                    if (checkTableKeyInRegion(key, region) != null) {
+                        allViewsInCurrentRegion = false;
+                    }
+                    results.add(result);
+                }
+            } finally {
+                scanner.close();
+                hTable.close();
+            }
+            TableViewFinderResult tableViewFinderResult = new 
TableViewFinderResult(results);
+            if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
+                tableViewFinderResult.setAllViewsNotInSingleRegion();
+            }
+            return tableViewFinderResult;
+        } finally {
+            pool.close();
+        }
+    }
+    
+    @Override
+    public void dropTable(RpcController controller, DropTableRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
+        boolean isCascade = request.getCascade();
+        byte[][] rowKeyMetaData = new byte[3][];
+        String tableType = request.getTableType();
+        byte[] schemaName = null;
+        byte[] tableName = null;
+
+        try {
+            List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+            MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
+            byte[] tenantIdBytes = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+            schemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+            tableName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+            // Disallow deletion of a system table
+            if (tableType.equals(PTableType.SYSTEM.getSerializedValue())) {
+                
builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                done.run(builder.build());
+                return;
+            }
+            List<byte[]> tableNamesToDelete = Lists.newArrayList();
+            byte[] parentTableName = 
MetaDataUtil.getParentTableName(tableMetadata);
+            byte[] lockTableName = parentTableName == null ? tableName : 
parentTableName;
+            byte[] lockKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, 
lockTableName);
+            byte[] key =
+                    parentTableName == null ? lockKey : 
SchemaUtil.getTableKey(tenantIdBytes,
+                        schemaName, tableName);
+
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            
+            try {
+                acquireLock(region, lockKey, locks);
+                if (key != lockKey) {
+                    acquireLock(region, key, locks);
+                }
+                List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>();
+                result =
+                        doDropTable(key, tenantIdBytes, schemaName, tableName, 
parentTableName,
+                            PTableType.fromSerializedValue(tableType), 
tableMetadata,
+                            invalidateList, locks, tableNamesToDelete, 
isCascade);
+                if (result.getMutationCode() != 
MutationCode.TABLE_ALREADY_EXISTS) {
+                    done.run(MetaDataMutationResult.toProto(result));
+                    return;
+                }
+                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                // Commit the list of deletion.
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
+                long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                for (ImmutableBytesPtr ckey : invalidateList) {
+                    metaDataCache.put(ckey, 
newDeletedTableMarker(currentTime));
+                }
+                if (parentTableName != null) {
+                    ImmutableBytesPtr parentCacheKey = new 
ImmutableBytesPtr(lockKey);
+                    metaDataCache.invalidate(parentCacheKey);
+                }
+                done.run(MetaDataMutationResult.toProto(result));
+                return;
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+          logger.error("dropTable failed", t);
+            ProtobufUtil.setControllerException(controller,
+                
ServerUtil.createIOException(SchemaUtil.getTableName(schemaName, tableName), 
t));
+        }
+    }
+
+    private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, 
byte[] schemaName,
+        byte[] tableName, byte[] parentTableName, PTableType tableType, 
List<Mutation> rowsToDelete,
+        List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
+        List<byte[]> tableNamesToDelete, boolean isCascade) throws 
IOException, SQLException {
+       
+       
+        long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
+    
+        HRegion region = env.getRegion();
+        ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+        
+        Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+        PTable table = metaDataCache.getIfPresent(cacheKey);
+       
+        // We always cache the latest version - fault in if not in cache
+        if (table != null
+                || (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP)) != null) {
+            if (table.getTimeStamp() < clientTimeStamp) {
+                if (isTableDeleted(table) || tableType != table.getType()) {
+                    return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
+            } else {
+                return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                        EnvironmentEdgeManager.currentTimeMillis(), null);
+            }
+        }
+        // We didn't find a table at the latest timestamp, so either there is 
no table or
+        // there was a table, but it's been deleted. In either case we want to 
return.
+        if (table == null) {
+            if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != 
null) {
+                return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+            }
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+        }
+        // Make sure we're not deleting the "wrong" child
+        if (!Arrays.equals(parentTableName, table.getParentTableName() == null 
? null : table.getParentTableName().getBytes())) {
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+        }
+        // Since we don't allow back in time DDL, we know if we have a table 
it's the one
+        // we want to delete. FIXME: we shouldn't need a scan here, but should 
be able to
+        // use the table to generate the Delete markers.
+        Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, 
clientTimeStamp);
+        RegionScanner scanner = region.getScanner(scan);
+        List<Cell> results = Lists.newArrayList();
+        scanner.next(results);
+        if (results.isEmpty()) { // Should not be possible
+            return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+        }
+
+        // Handle any child views that exist
+        TableViewFinderResult tableViewFinderResult = findChildViews(region, 
tenantId, table);
+        if (tableViewFinderResult.hasViews()) {
+               if (isCascade) {
+                       if (tableViewFinderResult.allViewsInMultipleRegions()) {
+                           // We don't yet support deleting a table with views 
where SYSTEM.CATALOG has split and the 
+                               // view metadata spans multiple regions
+                               return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                       } else if 
(tableViewFinderResult.allViewsInSingleRegion()) {
+                               // Recursively delete views - safe as all the 
views as all in the same region
+                               for (Result viewResult : 
tableViewFinderResult.getResults()) {
+                               byte[][] rowKeyMetaData = new byte[3][];
+                               getVarChars(viewResult.getRow(), 3, 
rowKeyMetaData);
+                               byte[] viewTenantId = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+                               byte[] viewSchemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+                               byte[] viewName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+                               byte[] viewKey = 
SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName);
+                               Delete delete = new Delete(viewKey, 
clientTimeStamp);
+                               rowsToDelete.add(delete);
+                               acquireLock(region, viewKey, locks);
+                               MetaDataMutationResult result =
+                                       doDropTable(viewKey, viewTenantId, 
viewSchemaName, viewName, null, PTableType.VIEW,
+                                           rowsToDelete, invalidateList, 
locks, tableNamesToDelete, false);
+                               if (result.getMutationCode() != 
MutationCode.TABLE_ALREADY_EXISTS) {
+                                   return result;
+                               }
+                                       }
+                       }       
+               } else {
+               // DROP without CASCADE on tables with child views is not 
permitted
+               return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+            }
+        } 
+        
+        if (tableType != PTableType.VIEW) { // Add to list of HTables to 
delete, unless it's a view
+            tableNamesToDelete.add(table.getName().getBytes());
+        }
+        List<byte[]> indexNames = Lists.newArrayList();
+        invalidateList.add(cacheKey);
+        byte[][] rowKeyMetaData = new byte[5][];
+        do {
+            Cell kv = results.get(LINK_TYPE_INDEX);
+            int nColumns = getVarChars(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength(), 0, rowKeyMetaData);
+            if (nColumns == 5
+                    && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length == 0
+                    && 
rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX].length > 0
+                    && Bytes.compareTo(kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength(), LINK_TYPE_BYTES, 0, 
LINK_TYPE_BYTES.length) == 0
+                    && 
LinkType.fromSerializedValue(kv.getValueArray()[kv.getValueOffset()]) == 
LinkType.INDEX_TABLE) {
+                
indexNames.add(rowKeyMetaData[PhoenixDatabaseMetaData.INDEX_NAME_INDEX]);
+            }
+            // FIXME: Remove when unintentionally deprecated method is fixed 
(HBASE-7870).
+            // FIXME: the version of the Delete constructor without the lock 
args was introduced
+            // in 0.94.4, thus if we try to use it here we can no longer use 
the 0.94.2 version
+            // of the client.
+            Delete delete = new Delete(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength(), clientTimeStamp);
+            rowsToDelete.add(delete);
+            results.clear();
+            scanner.next(results);
+        } while (!results.isEmpty());
+    
+        // Recursively delete indexes
+        for (byte[] indexName : indexNames) {
+            byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, 
indexName);
+            // FIXME: Remove when unintentionally deprecated method is fixed 
(HBASE-7870).
+            // FIXME: the version of the Delete constructor without the lock 
args was introduced
+            // in 0.94.4, thus if we try to use it here we can no longer use 
the 0.94.2 version
+            // of the client.
+            Delete delete = new Delete(indexKey, clientTimeStamp);
+            rowsToDelete.add(delete);
+            acquireLock(region, indexKey, locks);
+            MetaDataMutationResult result =
+                    doDropTable(indexKey, tenantId, schemaName, indexName, 
tableName, PTableType.INDEX,
+                        rowsToDelete, invalidateList, locks, 
tableNamesToDelete, false);
+            if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) 
{
+                return result;
+            }
+        }
+    
+        return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+                EnvironmentEdgeManager.currentTimeMillis(), table, 
tableNamesToDelete);
+    }
+
+    private static interface ColumnMutator {
+      MetaDataMutationResult updateMutation(PTable table, byte[][] 
rowKeyMetaData,
+          List<Mutation> tableMetadata, HRegion region,
+          List<ImmutableBytesPtr> invalidateList, List<RowLock> locks) throws 
IOException,
+          SQLException;
+    }
+
+    private MetaDataMutationResult
+    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws 
IOException {
+        byte[][] rowKeyMetaData = new byte[5][];
+        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
+        byte[] tenantId = 
rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+        byte[] schemaName = 
rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+        byte[] tableName = 
rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+        try {
+            byte[] key = SchemaUtil.getTableKey(tenantId, schemaName, 
tableName);
+            HRegion region = env.getRegion();
+            MetaDataMutationResult result = checkTableKeyInRegion(key, region);
+            if (result != null) {
+                return result;
+            }
+            List<RowLock> locks = Lists.newArrayList();
+            try {
+                acquireLock(region, key, locks);
+                ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
+                List<ImmutableBytesPtr> invalidateList = new 
ArrayList<ImmutableBytesPtr>();
+                invalidateList.add(cacheKey);
+                Cache<ImmutableBytesPtr,PTable> metaDataCache = 
GlobalCache.getInstance(this.env).getMetaDataCache();
+                PTable table = metaDataCache.getIfPresent(cacheKey);
+                if (logger.isDebugEnabled()) {
+                    if (table == null) {
+                        logger.debug("Table " + Bytes.toStringBinary(key)
+                                + " not found in cache. Will build through 
scan");
+                    } else {
+                        logger.debug("Table " + Bytes.toStringBinary(key)
+                                + " found in cache with timestamp " + 
table.getTimeStamp()
+                                + " seqNum " + table.getSequenceNumber());
+                    }
+                }
+                // Get client timeStamp from mutations
+                long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                if (table == null
+                        && (table = buildTable(key, cacheKey, region, 
HConstants.LATEST_TIMESTAMP)) == null) {
+                    // if not found then call newerTableExists and add delete 
marker for timestamp
+                    // found
+                    if (buildDeletedTable(key, cacheKey, region, 
clientTimeStamp) != null) {
+                        return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                                EnvironmentEdgeManager.currentTimeMillis(), 
null);
+                    }
+                    return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
+                if (table.getTimeStamp() >= clientTimeStamp) {
+                    return new 
MetaDataMutationResult(MutationCode.NEWER_TABLE_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), table);
+                } else if (isTableDeleted(table)) {
+                    return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                }
+        
+                long expectedSeqNum = 
MetaDataUtil.getSequenceNumber(tableMetadata) - 1; // lookup
+                                                                               
          // TABLE_SEQ_NUM
+                                                                               
          // in
+                                                                               
          // tableMetaData
+                if (logger.isDebugEnabled()) {
+                    logger.debug("For table " + Bytes.toStringBinary(key) + " 
expecting seqNum "
+                            + expectedSeqNum + " and found seqNum " + 
table.getSequenceNumber()
+                            + " with " + table.getColumns().size() + " 
columns: "
+                            + table.getColumns());
+                }
+                if (expectedSeqNum != table.getSequenceNumber()) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("For table " + Bytes.toStringBinary(key)
+                                + " returning CONCURRENT_TABLE_MUTATION due to 
unexpected seqNum");
+                    }
+                    return new 
MetaDataMutationResult(MutationCode.CONCURRENT_TABLE_MUTATION,
+                            EnvironmentEdgeManager.currentTimeMillis(), table);
+                }
+        
+                PTableType type = table.getType();
+                if (type == PTableType.INDEX) {
+                    // Disallow mutation of an index table
+                    return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION,
+                            EnvironmentEdgeManager.currentTimeMillis(), null);
+                } else {
+                    // server-side, except for indexing, we always expect the 
keyvalues to be standard KeyValues
+                    PTableType expectedType = 
MetaDataUtil.getTableType(tableMetadata, GenericKeyValueBuilder.INSTANCE, new 
ImmutableBytesPtr());
+                    // We said to drop a table, but found a view or visa versa
+                    if (type != expectedType) {
+                        return new 
MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                    }
+                    if (findChildViews(region, tenantId, table).hasViews()) {
+                        // Disallow any column mutations for parents of tenant 
tables
+                        return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), null);
+                    }
+                }
+                result = mutator.updateMutation(table, rowKeyMetaData, 
tableMetadata, region,
+                            invalidateList, locks);
+                if (result != null) {
+                    return result;
+                }
+        
+                region.mutateRowsWithLocks(tableMetadata, Collections.<byte[]> 
emptySet());
+                // Invalidate from cache
+                for (ImmutableBytesPtr invalidateKey : invalidateList) {
+                    metaDataCache.invalidate(invalidateKey);
+                }
+                // Get client timeStamp from mutations, since it may get 
updated by the
+                // mutateRowsWithLocks call
+                long currentTime = 
MetaDataUtil.getClientTimeStamp(tableMetadata);
+                return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime,
+                        null);
+            } finally {
+                region.releaseRowLocks(locks);
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, 
tableName), t);
+            return null; // impossible
+        }
+    }
+
+
+    @Override
+    public void addColumn(RpcController controller, AddColumnRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        try {
+            List<Mutation> tableMetaData = ProtobufUtil.getMutations(request);
+            MetaDataMutationResult result = mutateColumn(tableMetaData, new 
ColumnMutator() {
+                @Override
+                public MetaDataMutationResult updateMutation(PTable table, 
byte[][] rowKeyMetaData,
+                        List<Mutation> tableMetaData, HRegion region,
+                        List<ImmutableBytesPtr> invalidateList, List<RowLock> 
locks) {
+                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
+                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
+                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+                    for (Mutation m : tableMetaData) {
+                        byte[] key = m.getRow();
+                        boolean addingPKColumn = false;
+                        int pkCount = getVarChars(key, rowKeyMetaData);
+                        if (pkCount > COLUMN_NAME_INDEX
+                                && Bytes.compareTo(schemaName, 
rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
+                                && Bytes.compareTo(tableName, 
rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
+                            try {
+                                if (pkCount > FAMILY_NAME_INDEX
+                                        && 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
+                                    PColumnFamily family =
+                                            
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+                                    
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                } else if (pkCount > COLUMN_NAME_INDEX
+                                        && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
+                                    addingPKColumn = true;
+                                    table.getPKColumn(new String(
+                                            
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
+                                } else {
+                                    continue;
+                                }
+                                return new MetaDataMutationResult(
+                                        MutationCode.COLUMN_ALREADY_EXISTS, 
EnvironmentEdgeManager
+                                                .currentTimeMillis(), table);
+                            } catch (ColumnFamilyNotFoundException e) {
+                                continue;
+                            } catch (ColumnNotFoundException e) {
+                                if (addingPKColumn) {
+                                    // Add all indexes to invalidate list, as 
they will all be
+                                    // adding the same PK column. No need to 
lock them, as we
+                                    // have the parent table lock at this 
point.
+                                    for (PTable index : table.getIndexes()) {
+                                        invalidateList.add(new 
ImmutableBytesPtr(SchemaUtil
+                                                .getTableKey(tenantId, 
index.getSchemaName()
+                                                        .getBytes(), 
index.getTableName()
+                                                        .getBytes())));
+                                    }
+                                }
+                                continue;
+                            }
+                        }
+                    }
+                    return null;
+                }
+            });
+            if (result != null) {
+                done.run(MetaDataMutationResult.toProto(result));
+            }
+        } catch (IOException ioe) {
+            ProtobufUtil.setControllerException(controller, ioe);
+        }
+    }
+    
+    @Override
+    public void dropColumn(RpcController controller, DropColumnRequest request,
+            RpcCallback<MetaDataResponse> done) {
+        List<Mutation> tableMetaData = null;
+
+        try {
+            tableMetaData = ProtobufUtil.getMutations(request);
+            final long clientTimeStamp = 
MetaDataUtil.getClientTimeStamp(tableMetaData);
+            final List<byte[]> tableNamesToDelete = Lists.newArrayList();
+            MetaDataMutationResult result = mutateColumn(tableMetaData, new 
ColumnMutator() {
+                @Override
+                public MetaDataMutationResult updateMutation(PTable table, 
byte[][] rowKeyMetaData,
+                        List<Mutation> tableMetaData, HRegion region,
+                        List<ImmutableBytesPtr> invalidateList, List<RowLock> 
locks)
+                        throws IOException, SQLException {
+                    byte[] tenantId = rowKeyMetaData[TENANT_ID_INDEX];
+                    byte[] schemaName = rowKeyMetaData[SCHEMA_NAME_INDEX];
+                    byte[] tableName = rowKeyMetaData[TABLE_NAME_INDEX];
+                    boolean deletePKColumn = false;
+                    List<Mutation> additionalTableMetaData = 
Lists.newArrayList();
+                    for (Mutation m : tableMetaData) {
+                        if (m instanceof Delete) {
+                            byte[] key = m.getRow();
+                            int pkCount = getVarChars(key, rowKeyMetaData);
+                            if (pkCount > COLUMN_NAME_INDEX
+                                    && Bytes.compareTo(schemaName,
+                                        rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
+                                    && Bytes.compareTo(tableName, 
rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
+                                PColumn columnToDelete = null;
+                                try {
+                                    if (pkCount > FAMILY_NAME_INDEX
+                                            && 
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
+                                        PColumnFamily family =
+                                                
table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+                                        columnToDelete =
+                                                
family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                    } else if (pkCount > COLUMN_NAME_INDEX
+                                            && 
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
+                                        deletePKColumn = true;
+                                        columnToDelete = table.getPKColumn(new 
String(
+                                          
rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]));
+                                    } else {
+                                        continue;
+                                    }
+                                    if (columnToDelete.isViewReferenced()) { 
// Disallow deletion of column referenced in WHERE clause of view
+                                        return new 
MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, 
EnvironmentEdgeManager.currentTimeMillis(), table, columnToDelete);
+                                    }
+                                    // Look for columnToDelete in any indexes. 
If found as PK
+                                    // column, get lock and drop the index. If 
found as covered
+                                    // column, delete from index (do this 
client side?).
+                                    // In either case, invalidate index if the 
column is in it
+                                    for (PTable index : table.getIndexes()) {
+                                        try {
+                                            String indexColumnName = 
IndexUtil.getIndexColumnName(columnToDelete);
+                                            PColumn indexColumn = 
index.getColumn(indexColumnName);
+                                            byte[] indexKey =
+                                                    
SchemaUtil.getTableKey(tenantId, index
+                                                            
.getSchemaName().getBytes(), index.getTableName().getBytes());
+                                            // If index contains the column in 
it's PK, then drop it
+                                            if 
(SchemaUtil.isPKColumn(indexColumn)) {
+                                                // Since we're dropping the 
index, lock it to ensure
+                                                // that a change in index 
state doesn't
+                                                // occur while we're dropping 
it.
+                                                acquireLock(region, indexKey, 
locks);
+                                                // Drop the index table. The 
doDropTable will expand
+                                                // this to all of the table 
rows and invalidate the
+                                                // index table
+                                                
additionalTableMetaData.add(new Delete(indexKey, clientTimeStamp));
+                                                byte[] linkKey = 
MetaDataUtil.getParentLinkKey(tenantId,
+                                                            schemaName, 
tableName, index.getTableName().getBytes());
+                                                // Drop the link between the 
data table and the
+                                                // index table
+                                                
additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
+                                                doDropTable(indexKey, 
tenantId, index.getSchemaName().getBytes(), index.getTableName().getBytes(), 
tableName,
+                                                    index.getType(), 
additionalTableMetaData, invalidateList, locks, tableNamesToDelete, false);
+                                                // TODO: return in result?
+                                            } else {
+                                                invalidateList.add(new 
ImmutableBytesPtr(indexKey));
+                                            }
+                                        } catch (ColumnNotFoundException e) {
+                                        } catch (AmbiguousColumnException e) {
+                                        }
+                                    }
+                                } catch (ColumnFamilyNotFoundException e) {
+                                    return new MetaDataMutationResult(
+                                            MutationCode.COLUMN_NOT_FOUND, 
EnvironmentEdgeManager
+                        

<TRUNCATED>

Reply via email to