Repository: phoenix
Updated Branches:
  refs/heads/txn 81e52e85b -> 92ee51a0d


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 1608548..f2423ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -145,6 +145,7 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.PrimaryKeyConstraint;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.UpdateStatisticsStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -164,6 +165,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,7 +297,7 @@ public class MetaDataClient {
         MetaDataMutationResult result = updateCache(schemaName, tableName, 
true);
         return result.getMutationTime();
     }
-
+    
     /**
      * Update the cache with the latest as of the connection scn.
      * @param schemaName
@@ -314,30 +316,50 @@ public class MetaDataClient {
     public MetaDataMutationResult updateCache(PName tenantId, String 
schemaName, String tableName) throws SQLException {
         return updateCache(tenantId, schemaName, tableName, false);
     }
-
-    private long getClientTimeStamp() {
-        Long scn = connection.getSCN();
-        long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
-        return clientTimeStamp;
+    
+    public MetaDataMutationResult updateCache(PName tenantId, String 
schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+        return updateCache(tenantId, schemaName, tableName, alwaysHitServer, 
null);
     }
-
+    
+    private long getCurrentScn() {
+       Long scn = connection.getSCN();
+        long currentScn = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+        return currentScn;
+    }
+    
     private MetaDataMutationResult updateCache(PName tenantId, String 
schemaName, String tableName,
-            boolean alwaysHitServer) throws SQLException { // TODO: pass 
byte[] herez
-        long clientTimeStamp = getClientTimeStamp();
+            boolean alwaysHitServer, Long resolvedTimestamp) throws 
SQLException { // TODO: pass byte[] herez
         boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName);
         // System tables must always have a null tenantId
         tenantId = systemTable ? null : tenantId;
         PTable table = null;
+        PTableRef tableRef = null;
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         long tableTimestamp = HConstants.LATEST_TIMESTAMP;
+        long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
         try {
-            table = connection.getMetaDataCache().getTable(new 
PTableKey(tenantId, fullTableName));
+            tableRef = connection.getTableRef(new PTableKey(tenantId, 
fullTableName));
+            table = tableRef.getTable();
             tableTimestamp = table.getTimeStamp();
+            tableResolvedTimestamp = tableRef.getResolvedTimeStamp();
         } catch (TableNotFoundException e) {
         }
-        // Don't bother with server call: we can't possibly find a newer table
-        if (table != null && !alwaysHitServer && (systemTable || 
tableTimestamp == clientTimeStamp - 1)) {
-            return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,table);
+        
+        boolean defaultTransactional = 
connection.getQueryServices().getProps().getBoolean(
+                                                                       
QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB,
+                                                                       
QueryServicesOptions.DEFAULT_TRANSACTIONAL);
+        // start a txn if all table are transactional by default or if we 
found the table in the cache and it is transactional
+        // TODO if system tables become transactional remove the check 
+        boolean isTransactional = defaultTransactional || (table!=null && 
table.isTransactional());
+        if (!systemTable && isTransactional && 
connection.getMutationState().getTransaction()==null)
+               connection.getMutationState().startTransaction();
+        resolvedTimestamp = resolvedTimestamp==null ? 
TransactionUtil.getResolvedTimestamp(connection, isTransactional, 
HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
+        // Do not make rpc to getTable if 
+        // 1. table is a system table
+        // 2. table was already resolved as of that timestamp
+               if (table != null && !alwaysHitServer
+                               && (systemTable || resolvedTimestamp == 
tableResolvedTimestamp)) {
+            return new 
MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 
QueryConstants.UNSET_TIMESTAMP, table);
         }
 
         int maxTryCount = tenantId == null ? 1 : 2;
@@ -347,7 +369,12 @@ public class MetaDataClient {
         do {
             final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
             final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
-            result = connection.getQueryServices().getTable(tenantId, 
schemaBytes, tableBytes, tableTimestamp, clientTimeStamp);
+            ConnectionQueryServices queryServices = 
connection.getQueryServices();
+                       result = queryServices.getTable(tenantId, schemaBytes, 
tableBytes, tableTimestamp, resolvedTimestamp);
+                       // if the table was assumed to be transactional, but is 
actually not transactional then re-resolve as of the right timestamp (and vice 
versa) 
+                       if (table==null && result.getTable()!=null && 
result.getTable().isTransactional()!=isTransactional) {
+                               result = queryServices.getTable(tenantId, 
schemaBytes, tableBytes, tableTimestamp, 
TransactionUtil.getResolvedTimestamp(connection, 
result.getTable().isTransactional(), HConstants.LATEST_TIMESTAMP));
+                       }
 
             if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) {
                 return result;
@@ -372,14 +399,19 @@ public class MetaDataClient {
                 // server again.
                 if (table != null) {
                     // Ensures that table in result is set to table found in 
our cache.
-                    result.setTable(table);
                     if (code == MutationCode.TABLE_ALREADY_EXISTS) {
-                        // Although this table is up-to-date, the parent table 
may not be.
-                        // In this case, we update the parent table which may 
in turn pull
-                        // in indexes to add to this table.
-                        if (addIndexesFromPhysicalTable(result)) {
-                            connection.addTable(result.getTable());
-                        }
+                       result.setTable(table);
+                       // Although this table is up-to-date, the parent table 
may not be.
+                       // In this case, we update the parent table which may 
in turn pull
+                       // in indexes to add to this table.
+                       long resolvedTime = 
TransactionUtil.getResolvedTime(connection, result);
+                                               if 
(addIndexesFromPhysicalTable(result, resolvedTimestamp)) {
+                           connection.addTable(result.getTable(), 
resolvedTime);
+                       }
+                       else {
+                               // if we aren't adding the table, we still need 
to update the resolved time of the table 
+                               connection.updateResolvedTimestamp(table, 
resolvedTime);
+                       }
                         return result;
                     }
                     // If table was not found at the current time stamp and we 
have one cached, remove it.
@@ -400,10 +432,11 @@ public class MetaDataClient {
      * of the table for which we just updated.
      * TODO: combine this round trip with the one that updates the cache for 
the child table.
      * @param result the result from updating the cache for the current table.
+     * @param resolvedTimestamp timestamp at which child table was resolved
      * @return true if the PTable contained by result was modified and false 
otherwise
      * @throws SQLException if the physical table cannot be found
      */
-    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) 
throws SQLException {
+    private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result, 
Long resolvedTimestamp) throws SQLException {
         PTable table = result.getTable();
         // If not a view or if a view directly over an HBase table, there's 
nothing to do
         if (table.getType() != PTableType.VIEW || table.getViewType() == 
ViewType.MAPPED) {
@@ -412,7 +445,7 @@ public class MetaDataClient {
         String physicalName = table.getPhysicalName().getString();
         String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName);
         String tableName = SchemaUtil.getTableNameFromFullName(physicalName);
-        MetaDataMutationResult parentResult = updateCache(null, schemaName, 
tableName, false);
+        MetaDataMutationResult parentResult = updateCache(null, schemaName, 
tableName, false, resolvedTimestamp);
         PTable physicalTable = parentResult.getTable();
         if (physicalTable == null) {
             throw new TableNotFoundException(schemaName, tableName);
@@ -1085,19 +1118,19 @@ public class MetaDataClient {
                 // as there's no need to burn another sequence value.
                 if (allocateIndexId && indexId == null) {
                     Long scn = connection.getSCN();
-                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP 
: scn;
                     PName tenantId = connection.getTenantId();
                     String tenantIdStr = tenantId == null ? null : 
connection.getTenantId().getString();
                     PName physicalName = dataTable.getPhysicalName();
                     int nSequenceSaltBuckets = 
connection.getQueryServices().getSequenceSaltBuckets();
                     SequenceKey key = 
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, 
nSequenceSaltBuckets);
-                    // Create at parent timestamp as we know that will be 
earlier than now
-                    // and earlier than any SCN if one is set.
+                    // if scn is set create at scn-1, so we can see the 
sequence or else use latest timestamp (so that latest server time is used)
+                    long sequenceTimestamp = scn!=null ? scn-1 : 
HConstants.LATEST_TIMESTAMP;
                     createSequence(key.getTenantId(), key.getSchemaName(), 
key.getSequenceName(),
                         true, Short.MIN_VALUE, 1, 1, false, Long.MIN_VALUE, 
Long.MAX_VALUE,
-                        dataTable.getTimeStamp());
+                        sequenceTimestamp);
                     long[] seqValues = new long[1];
                     SQLException[] sqlExceptions = new SQLException[1];
+                    long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP 
: scn;
                     
connection.getQueryServices().incrementSequences(Collections.singletonList(key),
 timestamp, seqValues, sqlExceptions);
                     if (sqlExceptions[0] != null) {
                         throw sqlExceptions[0];
@@ -1212,8 +1245,10 @@ public class MetaDataClient {
             boolean isImmutableRows = false;
             List<PName> physicalNames = Collections.emptyList();
             boolean addSaltColumn = false;
+            Long timestamp = null;
             if (parent != null) {
                 transactional = parent.isTransactional();
+                timestamp = TransactionUtil.getTableTimestamp(connection, 
transactional, null);
                 storeNulls = parent.getStoreNulls();
                 if (tableType == PTableType.INDEX) {
                     // Index on view
@@ -1246,7 +1281,7 @@ public class MetaDataClient {
                     incrementStatement.execute();
                     // Get list of mutations and add to table meta data that 
will be passed to server
                     // to guarantee order. This row will always end up last
-                    
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                     connection.rollback();
     
                     // Add row linking from data table row to index table row
@@ -1366,6 +1401,7 @@ public class MetaDataClient {
                        .build().buildException();
                }
             }
+            timestamp = timestamp==null ? 
TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp;
 
             // Delay this check as it is supported to have IMMUTABLE_ROWS and 
SALT_BUCKETS defined on views
             if (statement.getTableType() == PTableType.VIEW || indexId != 
null) {
@@ -1600,7 +1636,7 @@ public class MetaDataClient {
                         Collections.<PName>emptyList(), defaultFamilyName == 
null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
                         Boolean.TRUE.equals(disableWAL), false, false, null, 
indexId, indexType, false);
-                connection.addTable(table);
+                connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
                     int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size();
@@ -1655,7 +1691,7 @@ public class MetaDataClient {
                 addColumnMutation(schemaName, tableName, column, colUpsert, 
parentTableName, pkName, keySeq, saltBucketNum != null);
             }
 
-            
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             String dataTableName = parent == null || tableType == 
PTableType.VIEW ? null : parent.getTableName().getString();
@@ -1699,7 +1735,7 @@ public class MetaDataClient {
             tableUpsert.setBoolean(20, transactional);
             tableUpsert.execute();
 
-            
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+            
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
 
             /*
@@ -1747,7 +1783,7 @@ public class MetaDataClient {
             default:
                 PName newSchemaName = PNameFactory.newName(schemaName);
                 PTable table =  PTableImpl.makePTable(
-                        tenantId, newSchemaName, 
PNameFactory.newName(tableName), tableType, indexState, 
result.getMutationTime(),
+                        tenantId, newSchemaName, 
PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? 
timestamp : result.getMutationTime(),
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : 
PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, 
dataTableName == null ? null : PNameFactory.newName(dataTableName), 
Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : 
PNameFactory.newName(defaultFamilyName), viewStatement, 
Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
@@ -1862,6 +1898,8 @@ public class MetaDataClient {
 
             MetaDataMutationResult result = 
connection.getQueryServices().dropTable(tableMetaData, tableType, cascade);
             MutationCode code = result.getMutationCode();
+            PTable table = result.getTable();
+                       boolean transactional = table!=null && 
table.isTransactional();
             switch (code) {
             case TABLE_NOT_FOUND:
                 if (!ifExists) { throw new TableNotFoundException(schemaName, 
tableName); }
@@ -1873,12 +1911,11 @@ public class MetaDataClient {
 
                 
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
             default:
-                connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, tableName), parentTableName,
-                        result.getMutationTime());
+                               connection.removeTable(tenantId, 
SchemaUtil.getTableName(schemaName, tableName), parentTableName,
+                        TransactionUtil.getTableTimestamp(connection, 
transactional, result.getMutationTime()));
 
                 if (result.getTable() != null && tableType != PTableType.VIEW) 
{
                     connection.setAutoCommit(true);
-                    PTable table = result.getTable();
                     boolean dropMetaData = result.getTable().getViewIndexId() 
== null &&
                             
connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, 
DEFAULT_DROP_METADATA);
                     long ts = (scn == null ? result.getMutationTime() : scn);
@@ -2094,7 +2131,9 @@ public class MetaDataClient {
 
             ListMultimap<String,Pair<String,Object>> stmtProperties = 
statement.getProps();
             Map<String, List<Pair<String, Object>>> properties = new 
HashMap<>(stmtProperties.size());
-            PTable table = FromCompiler.getResolver(statement, 
connection).getTables().get(0).getTable();
+            TableRef tableRef = FromCompiler.getResolver(statement, 
connection).getTables().get(0);
+                       PTable table = tableRef.getTable();
+            Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() 
: null;
             List<ColumnDef> columnDefs = statement.getColumnDefs();
             if (columnDefs == null) {
                 columnDefs = Collections.emptyList();
@@ -2236,7 +2275,7 @@ public class MetaDataClient {
                         }
                     }
 
-                    
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 } else {
                     // Check that HBase configured properly for mutable 
secondary indexing
@@ -2260,13 +2299,13 @@ public class MetaDataClient {
                     for (PTable index : table.getIndexes()) {
                         incrementTableSeqNum(index, index.getType(), 1);
                     }
-                    
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 long seqNum = table.getSequenceNumber();
                 if (changingPhoenixTableProperty || columnDefs.size() > 0) { 
                     seqNum = incrementTableSeqNum(table, 
statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant, 
storeNulls);
-                    
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
                 }
                 
@@ -2305,7 +2344,17 @@ public class MetaDataClient {
                     // Only update client side cache if we aren't adding a PK 
column to a table with indexes.
                     // We could update the cache manually then too, it'd just 
be a pain.
                     if (numPkColumnsAdded==0 || table.getIndexes().isEmpty()) {
-                        connection.addColumn(tenantId, 
SchemaUtil.getTableName(schemaName, tableName), columns, 
result.getMutationTime(), seqNum, isImmutableRows == null ? 
table.isImmutableRows() : isImmutableRows, disableWAL == null ? 
table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() 
: multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls);
+                                               connection.addColumn(
+                                                               tenantId,
+                                                               
SchemaUtil.getTableName(schemaName, tableName),
+                                                               columns,
+                                                               
result.getMutationTime(),
+                                                               seqNum,
+                                                               isImmutableRows 
== null ? table.isImmutableRows() : isImmutableRows,
+                                                               disableWAL == 
null ? table.isWALDisabled() : disableWAL,
+                                                               multiTenant == 
null ? table.isMultiTenant() : multiTenant,
+                                                               storeNulls == 
null ? table.getStoreNulls() : storeNulls, 
+                                                               
TransactionUtil.getResolvedTime(connection, result));
                     }
                     // Delete rows in view index if we haven't dropped it 
already
                     // We only need to do this if the multiTenant transitioned 
to false
@@ -2436,12 +2485,12 @@ public class MetaDataClient {
             boolean retried = false;
             while (true) {
                 final ColumnResolver resolver = 
FromCompiler.getResolver(statement, connection);
-                PTable table = resolver.getTables().get(0).getTable();
+                TableRef tableRef = resolver.getTables().get(0);
+                               PTable table = tableRef.getTable();
                 List<ColumnName> columnRefs = statement.getColumnRefs();
                 if(columnRefs == null) {
                     columnRefs = Lists.newArrayListWithCapacity(0);
                 }
-                TableRef tableRef = null;
                 List<ColumnRef> columnsToDrop = 
Lists.newArrayListWithExpectedSize(columnRefs.size() + 
table.getIndexes().size());
                 List<TableRef> indexesToDrop = 
Lists.newArrayListWithExpectedSize(table.getIndexes().size());
                 List<Mutation> tableMetaData = 
Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + 
table.getColumns().size() - columnRefs.size()));
@@ -2457,30 +2506,31 @@ public class MetaDataClient {
                         }
                         throw e;
                     }
-                    tableRef = columnRef.getTableRef();
                     PColumn columnToDrop = columnRef.getColumn();
                     tableColumnsToDrop.add(columnToDrop);
                     if (SchemaUtil.isPKColumn(columnToDrop)) {
                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_DROP_PK)
                             
.setColumnName(columnToDrop.getName().getString()).build().buildException();
                     }
-                    columnsToDrop.add(new ColumnRef(tableRef, 
columnToDrop.getPosition()));
+                    columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), 
columnToDrop.getPosition()));
                 }
 
                 dropColumnMutations(table, tableColumnsToDrop, tableMetaData);
                 for (PTable index : table.getIndexes()) {
+                       IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(table, connection);
+                       // get the columns required for the index pk
+                       Set<ColumnReference> indexColumns = 
indexMaintainer.getIndexedColumns();
+                       // get the covered columns 
+                       Set<ColumnReference> coveredColumns = 
indexMaintainer.getCoverededColumns();
                     List<PColumn> indexColumnsToDrop = 
Lists.newArrayListWithExpectedSize(columnRefs.size());
                     for(PColumn columnToDrop : tableColumnsToDrop) {
-                        String indexColumnName = 
IndexUtil.getIndexColumnName(columnToDrop);
-                        try {
-                            PColumn indexColumn = 
index.getColumn(indexColumnName);
-                            if (SchemaUtil.isPKColumn(indexColumn)) {
-                                indexesToDrop.add(new TableRef(index));
-                            } else {
-                                indexColumnsToDrop.add(indexColumn);
-                                columnsToDrop.add(new ColumnRef(tableRef, 
columnToDrop.getPosition()));
-                            }
-                        } catch (ColumnNotFoundException e) {
+                       ColumnReference columnToDropRef = new 
ColumnReference(columnToDrop.getFamilyName().getBytes(), 
columnToDrop.getName().getBytes());
+                       if (indexColumns.contains(columnToDropRef)) {
+                               indexesToDrop.add(new TableRef(index));
+                       } 
+                       else if (coveredColumns.contains(columnToDropRef)) {
+                               String indexColumnName = 
IndexUtil.getIndexColumnName(columnToDrop);
+                               
indexColumnsToDrop.add(index.getColumn(indexColumnName));
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {
@@ -2489,11 +2539,12 @@ public class MetaDataClient {
                     }
 
                 }
-                
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                Long timeStamp = table.isTransactional() ? 
tableRef.getTimeStamp() : null;
+                
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
                 long seqNum = incrementTableSeqNum(table, 
statement.getTableType(), -1);
-                
tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond());
+                
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
                 Collections.reverse(tableMetaData);
@@ -2544,7 +2595,7 @@ public class MetaDataClient {
                     // client-side cache as it would be too painful. Just let 
it pull it over from
                     // the server when needed.
                     if (tableColumnsToDrop.size() > 0 && 
indexesToDrop.isEmpty()) {
-                        connection.removeColumn(tenantId, 
SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, 
result.getMutationTime(), seqNum);
+                        connection.removeColumn(tenantId, 
SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, 
result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, 
result));
                     }
                     // If we have a VIEW, then only delete the metadata, and 
leave the table data alone
                     if (table.getType() != PTableType.VIEW) {
@@ -2586,7 +2637,7 @@ public class MetaDataClient {
                     if (retried) {
                         throw e;
                     }
-                    table = connection.getMetaDataCache().getTable(new 
PTableKey(tenantId, fullTableName));
+                    table = connection.getTable(new PTableKey(tenantId, 
fullTableName));
                     retried = true;
                 }
             }
@@ -2629,7 +2680,8 @@ public class MetaDataClient {
                     tableUpsert.close();
                 }
             }
-            List<Mutation> tableMetadata = 
connection.getMutationState().toMutations().next().getSecond();
+            Long timeStamp = indexRef.getTable().isTransactional() ? 
indexRef.getTimeStamp() : null;
+                       List<Mutation> tableMetadata = 
connection.getMutationState().toMutations(timeStamp).next().getSecond();
             connection.rollback();
 
             MetaDataMutationResult result = 
connection.getQueryServices().updateIndexState(tableMetadata, dataTableName);
@@ -2675,9 +2727,9 @@ public class MetaDataClient {
     }
 
     private PTable addTableToCache(MetaDataMutationResult result) throws 
SQLException {
-        addIndexesFromPhysicalTable(result);
+        addIndexesFromPhysicalTable(result, null);
         PTable table = result.getTable();
-        connection.addTable(table);
+        connection.addTable(table, TransactionUtil.getResolvedTime(connection, 
result));
         return table;
     }
     
@@ -2700,13 +2752,14 @@ public class MetaDataClient {
          */
         boolean isSharedIndex = table.getViewIndexId() != null;
         if (isSharedIndex) {
-            return 
connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), 
getClientTimeStamp());
+               // we are assuming the stats table is not transactional
+            return 
connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), 
getCurrentScn());
         }
         boolean isView = table.getType() == PTableType.VIEW;
         String physicalName = table.getPhysicalName().getString();
         if (isView && table.getViewType() != ViewType.MAPPED) {
             try {
-                return connection.getMetaDataCache().getTable(new 
PTableKey(null, physicalName)).getTableStats();
+                return connection.getTable(new PTableKey(null, 
physicalName)).getTableStats();
             } catch (TableNotFoundException e) {
                 // Possible when the table timestamp == current timestamp - 1.
                 // This would be most likely during the initial index build of 
a view index

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
index c104473..207bc2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema;
 
+import java.sql.SQLException;
+
 import org.apache.phoenix.query.MetaDataMutated;
 
 
@@ -26,6 +28,6 @@ public interface PMetaData extends MetaDataMutated, 
Iterable<PTable>, Cloneable
     }
     public int size();
     public PMetaData clone();
-    public PTable getTable(PTableKey key) throws TableNotFoundException;
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException;
     public PMetaData pruneTables(Pruner pruner);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 2f84c95..8cfbb18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -40,32 +40,12 @@ import com.google.common.primitives.Longs;
  *
  */
 public class PMetaDataImpl implements PMetaData {
-        private static final class PTableRef {
-            public final PTable table;
-            public final int estSize;
-            public volatile long lastAccessTime;
-            
-            public PTableRef(PTable table, long lastAccessTime, int estSize) {
-                this.table = table;
-                this.lastAccessTime = lastAccessTime;
-                this.estSize = estSize;
-            }
-
-            public PTableRef(PTable table, long lastAccessTime) {
-                this (table, lastAccessTime, table.getEstimatedSize());
-            }
-
-            public PTableRef(PTableRef tableRef) {
-                this (tableRef.table, tableRef.lastAccessTime, 
tableRef.estSize);
-            }
-        }
-
         private static class PTableCache implements Cloneable {
             private static final int MIN_REMOVAL_SIZE = 3;
             private static final Comparator<PTableRef> COMPARATOR = new 
Comparator<PTableRef>() {
                 @Override
                 public int compare(PTableRef tableRef1, PTableRef tableRef2) {
-                    return Longs.compare(tableRef1.lastAccessTime, 
tableRef2.lastAccessTime);
+                    return Longs.compare(tableRef1.getLastAccessTime(), 
tableRef2.getLastAccessTime());
                 }
             };
             private static final MinMaxPriorityQueue.Builder<PTableRef> 
BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR);
@@ -88,7 +68,7 @@ public class PMetaDataImpl implements PMetaData {
                 Map<PTableKey,PTableRef> newTables = 
newMap(Math.max(tables.size(),expectedCapacity));
                 // Copy value so that access time isn't changing anymore
                 for (PTableRef tableAccess : tables.values()) {
-                    newTables.put(tableAccess.table.getKey(), new 
PTableRef(tableAccess));
+                    newTables.put(tableAccess.getTable().getKey(), new 
PTableRef(tableAccess));
                 }
                 return newTables;
             }
@@ -114,7 +94,7 @@ public class PMetaDataImpl implements PMetaData {
                 if (tableAccess == null) {
                     return null;
                 }
-                tableAccess.lastAccessTime = timeKeeper.getCurrentTime();
+                tableAccess.setLastAccessTime(timeKeeper.getCurrentTime());
                 return tableAccess;
             }
             
@@ -138,37 +118,37 @@ public class PMetaDataImpl implements PMetaData {
                 // Add to new cache, but track references to remove when done
                 // to bring cache at least overage amount below it's max size.
                 for (PTableRef tableRef : tables.values()) {
-                    newCache.put(tableRef.table.getKey(), new 
PTableRef(tableRef));
+                    newCache.put(tableRef.getTable().getKey(), new 
PTableRef(tableRef));
                     toRemove.add(tableRef);
-                    toRemoveBytes += tableRef.estSize;
-                    if (toRemoveBytes - toRemove.peekLast().estSize > overage) 
{
+                    toRemoveBytes += tableRef.getEstSize();
+                    if (toRemoveBytes - toRemove.peekLast().getEstSize() > 
overage) {
                         PTableRef removedRef = toRemove.removeLast();
-                        toRemoveBytes -= removedRef.estSize;
+                        toRemoveBytes -= removedRef.getEstSize();
                     }
                 }
                 for (PTableRef toRemoveRef : toRemove) {
-                    newCache.remove(toRemoveRef.table.getKey());
+                    newCache.remove(toRemoveRef.getTable().getKey());
                 }
                 return newCache;
             }
 
             private PTable put(PTableKey key, PTableRef ref) {
-                currentByteSize += ref.estSize;
+                currentByteSize += ref.getEstSize();
                 PTableRef oldTableAccess = tables.put(key, ref);
                 PTable oldTable = null;
                 if (oldTableAccess != null) {
-                    currentByteSize -= oldTableAccess.estSize;
-                    oldTable = oldTableAccess.table;
+                    currentByteSize -= oldTableAccess.getEstSize();
+                    oldTable = oldTableAccess.getTable();
                 }
                 return oldTable;
             }
 
-            public PTable put(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, 
timeKeeper.getCurrentTime()));
+            public PTable put(PTableKey key, PTable value, long resolvedTime) {
+                return put(key, new PTableRef(value, 
timeKeeper.getCurrentTime(), resolvedTime));
             }
             
-            public PTable putDuplicate(PTableKey key, PTable value) {
-                return put(key, new PTableRef(value, 
timeKeeper.getCurrentTime(), 0));
+            public PTable putDuplicate(PTableKey key, PTable value, long 
resolvedTime) {
+                return put(key, new PTableRef(value, 
timeKeeper.getCurrentTime(), 0, resolvedTime));
             }
             
             public PTable remove(PTableKey key) {
@@ -176,8 +156,8 @@ public class PMetaDataImpl implements PMetaData {
                 if (value == null) {
                     return null;
                 }
-                currentByteSize -= value.estSize;
-                return value.table;
+                currentByteSize -= value.getEstSize();
+                return value.getTable();
             }
             
             public Iterator<PTable> iterator() {
@@ -191,7 +171,7 @@ public class PMetaDataImpl implements PMetaData {
 
                     @Override
                     public PTable next() {
-                        return iterator.next().table;
+                        return iterator.next().getTable();
                     }
 
                     @Override
@@ -235,12 +215,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PTable getTable(PTableKey key) throws TableNotFoundException {
+    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
         PTableRef ref = metaData.get(key);
         if (ref == null) {
             throw new TableNotFoundException(key.getName());
         }
-        return ref.table;
+        return ref;
     }
 
     @Override
@@ -248,22 +228,29 @@ public class PMetaDataImpl implements PMetaData {
         return metaData.size();
     }
 
+    @Override
+    public PMetaData updateResolvedTimestamp(PTable table, long 
resolvedTimestamp) throws SQLException {
+       PTableCache clone = metaData.clone();
+       clone.putDuplicate(table.getKey(), table, resolvedTimestamp);
+       return new PMetaDataImpl(clone);
+    }
 
     @Override
-    public PMetaData addTable(PTable table) throws SQLException {
+    public PMetaData addTable(PTable table, long resolvedTime) throws 
SQLException {
         int netGain = 0;
         PTableKey key = table.getKey();
         PTableRef oldTableRef = metaData.get(key);
         if (oldTableRef != null) {
-            netGain -= oldTableRef.estSize;
+            netGain -= oldTableRef.getEstSize();
         }
         PTable newParentTable = null;
+        long parentResolvedTimestamp = resolvedTime;
         if (table.getParentName() != null) { // Upsert new index table into 
parent data table list
             String parentName = table.getParentName().getString();
             PTableRef oldParentRef = metaData.get(new 
PTableKey(table.getTenantId(), parentName));
             // If parentTable isn't cached, that's ok we can skip this
             if (oldParentRef != null) {
-                List<PTable> oldIndexes = oldParentRef.table.getIndexes();
+                List<PTable> oldIndexes = oldParentRef.getTable().getIndexes();
                 List<PTable> newIndexes = 
Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1);
                 newIndexes.addAll(oldIndexes);
                 for (int i = 0; i < newIndexes.size(); i++) {
@@ -274,8 +261,8 @@ public class PMetaDataImpl implements PMetaData {
                     }
                 }
                 newIndexes.add(table);
-                netGain -= oldParentRef.estSize;
-                newParentTable = PTableImpl.makePTable(oldParentRef.table, 
table.getTimeStamp(), newIndexes);
+                netGain -= oldParentRef.getEstSize();
+                newParentTable = 
PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), 
newIndexes);
                 netGain += newParentTable.getEstimatedSize();
             }
         }
@@ -286,24 +273,24 @@ public class PMetaDataImpl implements PMetaData {
         PTableCache tables = overage <= 0 ? metaData.clone() : 
metaData.cloneMinusOverage(overage);
         
         if (newParentTable != null) { // Upsert new index table into parent 
data table list
-            tables.put(newParentTable.getKey(), newParentTable);
-            tables.putDuplicate(table.getKey(), table);
+            tables.put(newParentTable.getKey(), newParentTable, 
parentResolvedTimestamp);
+            tables.putDuplicate(table.getKey(), table, resolvedTime);
         } else {
-            tables.put(table.getKey(), table);
+            tables.put(table.getKey(), table, resolvedTime);
         }
         for (PTable index : table.getIndexes()) {
-            tables.putDuplicate(index.getKey(), index);
+            tables.putDuplicate(index.getKey(), index, resolvedTime);
         }
         return new PMetaDataImpl(tables);
     }
 
     @Override
-    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> 
columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, 
boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws 
SQLException {
+    public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> 
columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, 
boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long 
resolvedTime) throws SQLException {
         PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, 
tableName));
         if (oldTableRef == null) {
             return this;
         }
-        List<PColumn> oldColumns = 
PTableImpl.getColumnsToClone(oldTableRef.table);
+        List<PColumn> oldColumns = 
PTableImpl.getColumnsToClone(oldTableRef.getTable());
         List<PColumn> newColumns;
         if (columnsToAdd.isEmpty()) {
             newColumns = oldColumns;
@@ -312,8 +299,8 @@ public class PMetaDataImpl implements PMetaData {
             newColumns.addAll(oldColumns);
             newColumns.addAll(columnsToAdd);
         }
-        PTable newTable = PTableImpl.makePTable(oldTableRef.table, 
tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, 
isMultitenant, storeNulls);
-        return addTable(newTable);
+        PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), 
tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, 
isMultitenant, storeNulls);
+        return addTable(newTable, resolvedTime);
     }
 
     @Override
@@ -340,7 +327,7 @@ public class PMetaDataImpl implements PMetaData {
         }
         // also remove its reference from parent table
         if (parentTableRef != null) {
-            List<PTable> oldIndexes = parentTableRef.table.getIndexes();
+            List<PTable> oldIndexes = parentTableRef.getTable().getIndexes();
             if(oldIndexes != null && !oldIndexes.isEmpty()) {
                 List<PTable> newIndexes = 
Lists.newArrayListWithExpectedSize(oldIndexes.size());
                 newIndexes.addAll(oldIndexes);
@@ -349,13 +336,13 @@ public class PMetaDataImpl implements PMetaData {
                     if (index.getName().getString().equals(tableName)) {
                         newIndexes.remove(i);
                         PTable parentTable = PTableImpl.makePTable(
-                                parentTableRef.table,
-                                tableTimeStamp == HConstants.LATEST_TIMESTAMP 
? parentTableRef.table.getTimeStamp() : tableTimeStamp,
+                                parentTableRef.getTable(),
+                                tableTimeStamp == HConstants.LATEST_TIMESTAMP 
? parentTableRef.getTable().getTimeStamp() : tableTimeStamp,
                                 newIndexes);
                         if (tables == null) { 
                             tables = metaData.clone();
                         }
-                        tables.put(parentTable.getKey(), parentTable);
+                        tables.put(parentTable.getKey(), parentTable, 
parentTableRef.getResolvedTimeStamp());
                         break;
                     }
                 }
@@ -365,12 +352,12 @@ public class PMetaDataImpl implements PMetaData {
     }
     
     @Override
-    public PMetaData removeColumn(PName tenantId, String tableName, 
List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws 
SQLException {
+    public PMetaData removeColumn(PName tenantId, String tableName, 
List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long 
resolvedTime) throws SQLException {
         PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName));
         if (tableRef == null) {
             return this;
         }
-        PTable table = tableRef.table;
+        PTable table = tableRef.getTable();
         PTableCache tables = metaData.clone();
         for (PColumn columnToRemove : columnsToRemove) {
             PColumn column;
@@ -399,7 +386,7 @@ public class PMetaDataImpl implements PMetaData {
             
             table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, 
columns);
         }
-        tables.put(table.getKey(), table);
+        tables.put(table.getKey(), table, resolvedTime);
         return new PMetaDataImpl(tables);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
new file mode 100644
index 0000000..83d0b42
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+public class PTableRef {
+    private final PTable table;
+    private final int estSize;
+       private volatile long lastAccessTime;
+       // timestamp (scn or txn timestamp at which rpc to fetch the table was 
made)
+    private long resolvedTimeStamp;
+    
+    public PTableRef(PTable table, long lastAccessTime, int estSize, long 
resolvedTime) {
+        this.table = table;
+        this.lastAccessTime = lastAccessTime;
+        this.estSize = estSize;
+        this.resolvedTimeStamp = resolvedTime;
+    }
+
+    public PTableRef(PTable table, long lastAccessTime, long resolvedTime) {
+        this (table, lastAccessTime, table.getEstimatedSize(), resolvedTime);
+    }
+
+    public PTableRef(PTableRef tableRef) {
+        this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize, 
tableRef.resolvedTimeStamp);
+    }
+    
+    public PTable getTable() {
+               return table;
+       }
+
+       public long getResolvedTimeStamp() {
+               return resolvedTimeStamp;
+       }
+       
+    public int getEstSize() {
+               return estSize;
+       }
+
+       public long getLastAccessTime() {
+               return lastAccessTime;
+       }
+
+       public void setLastAccessTime(long lastAccessTime) {
+               this.lastAccessTime = lastAccessTime;
+       }
+       
+       public void setResolvedTimeStamp(long resolvedTimeStamp) {
+               this.resolvedTimeStamp = resolvedTimeStamp;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index ec3e64b..316b6ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -113,7 +113,7 @@ public class TableRef {
         if (obj == null) return false;
         if (getClass() != obj.getClass()) return false;
         TableRef other = (TableRef)obj;
-        // FIXME: a null alias on either side should mean a wildcard and 
should not fail the equals check
+        // a null alias on either side should mean a wildcard and should not 
fail the equals check
         if ((alias == null && other.alias != null) || (alias != null && 
!alias.equals(other.alias))) return false;
         if 
(!table.getName().getString().equals(other.table.getName().getString())) return 
false;
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index b030510..4fdd597 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -268,7 +268,7 @@ public class PhoenixRuntime {
      */
     public static Iterator<Pair<byte[],List<KeyValue>>> 
getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) 
throws SQLException {
         final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        final Iterator<Pair<byte[],List<Mutation>>> iterator = 
pconn.getMutationState().toMutations(includeMutableIndexes);
+        final Iterator<Pair<byte[],List<Mutation>>> iterator = 
pconn.getMutationState().toMutations(includeMutableIndexes, null);
         return new Iterator<Pair<byte[],List<KeyValue>>>() {
 
             @Override
@@ -304,7 +304,7 @@ public class PhoenixRuntime {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         try {
             name = SchemaUtil.normalizeIdentifier(name);
-            table = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), name));
+            table = pconn.getTable(new PTableKey(pconn.getTenantId(), name));
         } catch (TableNotFoundException e) {
             String schemaName = SchemaUtil.getSchemaNameFromFullName(name);
             String tableName = SchemaUtil.getTableNameFromFullName(name);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 47db678..4dc792f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -489,15 +489,10 @@ public class SchemaUtil {
     }
 
     protected static PhoenixConnection addMetaDataColumn(PhoenixConnection 
conn, long scn, String columnDef) throws SQLException {
-        String url = conn.getURL();
-        Properties props = conn.getClientInfo();
-        PMetaData metaData = conn.getMetaDataCache();
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(scn));
         PhoenixConnection metaConnection = null;
-
         Statement stmt = null;
         try {
-            metaConnection = new PhoenixConnection(conn.getQueryServices(), 
url, props, metaData);
+            metaConnection = new PhoenixConnection(conn.getQueryServices(), 
conn, scn);
             try {
                 stmt = metaConnection.createStatement();
                 stmt.executeUpdate("ALTER TABLE SYSTEM.\"TABLE\" ADD IF NOT 
EXISTS " + columnDef);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index f013244..b0a8c9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -27,9 +27,13 @@ import co.cask.tephra.TransactionFailureException;
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 
 public class TransactionUtil {
@@ -38,10 +42,14 @@ public class TransactionUtil {
     
     private static final TransactionCodec codec = new TransactionCodec();
     
-    public static long translateMillis(long serverTimeStamp) {
+    public static long convertToNanoseconds(long serverTimeStamp) {
         return serverTimeStamp * 1000000;
     }
     
+    public static long convertToMillisecods(Long serverTimeStamp) {
+        return serverTimeStamp / 1000000;
+    }
+    
     public static byte[] encodeTxnState(Transaction txn) throws SQLException {
         try {
             return codec.encode(txn);
@@ -72,4 +80,34 @@ public class TransactionUtil {
        // Conflict detection is not needed for tables with 
write-once/append-only data
        return new TransactionAwareHTable(htable, table.isImmutableRows() ? 
TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
     }
+    
+       public static long getResolvedTimestamp(PhoenixConnection connection, 
boolean isTransactional, Long defaultResolvedTimestamp) {
+               Transaction transaction = 
connection.getMutationState().getTransaction();
+               Long scn = connection.getSCN();
+           return scn != null ?  scn : (isTransactional && transaction!=null) 
? convertToMillisecods(transaction.getReadPointer()) : defaultResolvedTimestamp;
+       }
+
+       public static long getResolvedTime(PhoenixConnection connection, 
MetaDataMutationResult result) {
+               PTable table = result.getTable();
+               boolean isTransactional = table!=null && 
table.isTransactional();
+               return getResolvedTimestamp(connection, isTransactional, 
result.getMutationTime());
+       }
+
+       public static long getTableTimestamp(PhoenixConnection connection, 
MetaDataMutationResult result) {
+               PTable table = result.getTable();
+               Transaction transaction = 
connection.getMutationState().getTransaction();
+               boolean transactional = table!=null && table.isTransactional();
+               return  (transactional && transaction!=null) ? 
convertToMillisecods(transaction.getReadPointer()) : result.getMutationTime();
+       }
+
+       public static Long getTableTimestamp(PhoenixConnection connection, 
boolean transactional, Long mutationTime) throws SQLException {
+               Long timestamp = mutationTime;
+               MutationState mutationState = connection.getMutationState();
+               if (transactional && mutationState.getTransaction()==null && 
connection.getSCN()==null) {
+                       mutationState.startTransaction();
+                       timestamp = 
convertToMillisecods(mutationState.getTransaction().getReadPointer());
+                       connection.commit();
+               }
+               return timestamp;
+       }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 7c36245..fbaa01e 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -151,7 +151,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             String query = "CREATE TABLE t1 (k integer not null primary key, 
a.k decimal, b.k decimal)";
             conn.createStatement().execute(query);
             PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-            PColumn c = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), "T1")).getColumn("K");
+            PColumn c = pconn.getTable(new PTableKey(pconn.getTenantId(), 
"T1")).getColumn("K");
             assertTrue(SchemaUtil.isPKColumn(c));
         } finally {
             conn.close();
@@ -1162,7 +1162,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
     
     private void assertImmutableRows(Connection conn, String fullTableName, 
boolean expectedValue) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        assertEquals(expectedValue, pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
+        assertEquals(expectedValue, pconn.getTable(new 
PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
index 7a0bac6..5b457e7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java
@@ -31,20 +31,20 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 
 public class ViewCompilerTest extends BaseConnectionlessQueryTest {
     @Test
     public void testViewTypeCalculation() throws Exception {
-        assertViewType(new String[] {
+        assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] {
             "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 = 1 AND k2 = 'foo'",
             "CREATE VIEW v2 AS SELECT * FROM t WHERE k2 = 'foo'",
             "CREATE VIEW v3 AS SELECT * FROM t WHERE v = 'bar'||'bas'",
             "CREATE VIEW v4 AS SELECT * FROM t WHERE 'bar'=v and 5+3/2 = k1",
         }, ViewType.UPDATABLE);
-        assertViewType(new String[] {
+        assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] {
                 "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 < 1 AND k2 = 
'foo'",
                 "CREATE VIEW v2 AS SELECT * FROM t WHERE substr(k2,0,3) = 
'foo'",
                 "CREATE VIEW v3 AS SELECT * FROM t WHERE v = 
TO_CHAR(CURRENT_DATE())",
@@ -52,28 +52,27 @@ public class ViewCompilerTest extends 
BaseConnectionlessQueryTest {
             }, ViewType.READ_ONLY);
     }
     
-    public void assertViewType(String[] views, ViewType viewType) throws 
Exception {
+    public void assertViewType(String[] viewNames, String[] viewDDLs, ViewType 
viewType) throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         PhoenixConnection conn = DriverManager.getConnection(getUrl(), 
props).unwrap(PhoenixConnection.class);
         String ct = "CREATE TABLE t (k1 INTEGER NOT NULL, k2 VARCHAR, v 
VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2))";
         conn.createStatement().execute(ct);
         
-        for (String view : views) {
-            conn.createStatement().execute(view);
+        for (String viewDDL : viewDDLs) {
+            conn.createStatement().execute(viewDDL);
         }
         
         StringBuilder buf = new StringBuilder();
         int count = 0;
-        for (PTable table : conn.getMetaDataCache()) {
-            if (table.getType() == PTableType.VIEW) {
-                assertEquals(viewType, table.getViewType());
-                conn.createStatement().execute("DROP VIEW " + 
table.getName().getString());
-                buf.append(' ');
-                buf.append(table.getName().getString());
-                count++;
-            }
+        for (String view : viewNames) {
+               PTable table = conn.getTable(new PTableKey(null, view));
+            assertEquals(viewType, table.getViewType());
+            conn.createStatement().execute("DROP VIEW " + 
table.getName().getString());
+            buf.append(' ');
+            buf.append(table.getName().getString());
+            count++;
         }
-        assertEquals("Expected " + views.length + ", but got " + count + ":"+ 
buf.toString(), views.length, count);
+        assertEquals("Expected " + viewDDLs.length + ", but got " + count + 
":"+ buf.toString(), viewDDLs.length, count);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
index 29e14bf..cfd76bc 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.SortedMap;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.end2end.Shadower;
@@ -646,7 +647,7 @@ public class SkipScanBigFilterTest extends 
BaseConnectionlessQueryTest {
         }
         stmt.execute();
         
-        final PTable table = 
conn.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new 
PTableKey(null, "PERF.BIG_OLAP_DOC"));
+        final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, "PERF.BIG_OLAP_DOC"));
         GuidePostsInfo info = new GuidePostsInfo(0,Collections.<byte[]> 
emptyList(), 0l);
         for (byte[] gp : guidePosts) {
             info.addGuidePost(gp, 1000);
@@ -670,7 +671,7 @@ public class SkipScanBigFilterTest extends 
BaseConnectionlessQueryTest {
                 return table.getTimeStamp()+1;
             }
         });
-        conn.unwrap(PhoenixConnection.class).addTable(tableWithStats);
+        conn.unwrap(PhoenixConnection.class).addTable(tableWithStats, 
System.currentTimeMillis());
 
         String query = "SELECT count(1) cnt,\n" + 
                 "       coalesce(SUM(impressions), 0.0) AS \"impressions\",\n" 
+ 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 6e1c28f..e941ceb 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -100,8 +100,8 @@ public class IndexMaintainerTest  extends 
BaseConnectionlessQueryTest {
         try {
             conn.createStatement().execute("CREATE INDEX idx ON " + 
fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : 
"INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : 
indexProps));
             PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-            PTable table = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), fullTableName));
-            PTable index = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(),fullIndexName));
+            PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
fullTableName));
+            PTable index = pconn.getTable(new 
PTableKey(pconn.getTenantId(),fullIndexName));
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             table.getIndexMaintainers(ptr, pconn);
             List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, 
builder);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index abaaeb5..452ea4d 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -123,7 +123,7 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(HConstants.LATEST_TIMESTAMP));
         PhoenixConnection conn = 
DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, 
props).unwrap(PhoenixConnection.class);
         try {
-            PTable table = conn.getMetaDataCache().getTable(new 
PTableKey(null, ATABLE_NAME));
+            PTable table = conn.getTable(new PTableKey(null, ATABLE_NAME));
             ATABLE = table;
             ORGANIZATION_ID = new ColumnRef(new TableRef(table), 
table.getColumn("ORGANIZATION_ID").getPosition()).newColumnExpression();
             ENTITY_ID = new ColumnRef(new TableRef(table), 
table.getColumn("ENTITY_ID").getPosition()).newColumnExpression();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 3d6cb0c..da965c8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -142,6 +142,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.utils.Networks;
 import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.apache.twill.zookeeper.ZKClientServices;
@@ -498,10 +499,9 @@ public abstract class BaseTest {
         config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
         config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times");
         config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, 
Networks.getRandomPort());
         config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, 
tmpFolder.newFolder().getAbsolutePath());
         config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 600);
-//        config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, 
ConnectionInfo.getZookeeperConnectionString(getUrl()));
-//        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tmp");
 
         ConnectionInfo connInfo = ConnectionInfo.create(getUrl());
         zkClient = ZKClientServices.delegate(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index cee1054..21e1f62 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -105,7 +105,7 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
-        PTable table = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), TABLE_NAME));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
TABLE_NAME));
         TableRef tableRef = new TableRef(table);
         List<HRegionLocation> regions = 
pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes());
         List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
index 9379ef3..405a73c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java
@@ -32,7 +32,7 @@ public class PMetaDataImplTest {
     
     private static PMetaData addToTable(PMetaData metaData, String name, int 
size) throws SQLException {
         PTable table = new PSizedTable(new PTableKey(null,name), size);
-        return metaData.addTable(table);
+        return metaData.addTable(table, System.currentTimeMillis());
     }
     
     private static PMetaData removeFromTable(PMetaData metaData, String name) 
throws SQLException {
@@ -40,7 +40,7 @@ public class PMetaDataImplTest {
     }
     
     private static PTable getFromTable(PMetaData metaData, String name) throws 
TableNotFoundException {
-        return metaData.getTable(new PTableKey(null,name));
+        return metaData.getTableRef(new PTableKey(null,name)).getTable();
     }
     
     private static void assertNames(PMetaData metaData, String... names) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
index bcd08f0..6977103 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java
@@ -56,7 +56,7 @@ public class RowKeySchemaTest  extends 
BaseConnectionlessQueryTest  {
         String fullTableName = 
SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + 
dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + "))  " + 
(dataProps.isEmpty() ? "" : dataProps) );
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        PTable table = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), fullTableName));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
fullTableName));
         conn.close();
         StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName  
+ " VALUES(");
         for (int i = 0; i < values.length; i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
index 23ec4bf..7ab72d6 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java
@@ -52,7 +52,7 @@ public class RowKeyValueAccessorTest  extends 
BaseConnectionlessQueryTest  {
         String fullTableName = 
SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName));
         conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + 
dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + "))  " + 
(dataProps.isEmpty() ? "" : dataProps) );
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-        PTable table = pconn.getMetaDataCache().getTable(new 
PTableKey(pconn.getTenantId(), fullTableName));
+        PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), 
fullTableName));
         conn.close();
         StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName  
+ " VALUES(");
         for (int i = 0; i < values.length; i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 9fbb8c9..bead2df 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -25,14 +25,17 @@ import static 
org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PAR
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -567,5 +570,27 @@ public class TestUtil {
         analyzeTable(conn, tableName);
         conn.close();
     }
+    
+    public static void setRowKeyColumns(PreparedStatement stmt, int i) throws 
SQLException {
+        // insert row
+        stmt.setString(1, "varchar" + String.valueOf(i));
+        stmt.setString(2, "char" + String.valueOf(i));
+        stmt.setInt(3, i);
+        stmt.setLong(4, i);
+        stmt.setBigDecimal(5, new BigDecimal(i*0.5d));
+        Date date = new Date(DateUtil.parseDate("2015-01-01 
00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+        stmt.setDate(6, date);
+    }
+       
+    public static void validateRowKeyColumns(ResultSet rs, int i) throws 
SQLException {
+               assertTrue(rs.next());
+               assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
+               assertEquals(rs.getString(2), "char" + String.valueOf(i));
+               assertEquals(rs.getInt(3), i);
+               assertEquals(rs.getInt(4), i);
+               assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
+               Date date = new Date(DateUtil.parseDate("2015-01-01 
00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
+               assertEquals(rs.getDate(6), date);
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 731955e..6c248b1 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -165,6 +165,13 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
     </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <type>test-jar</type>
+      <version>${tephra.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-pig/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pig/pom.xml b/phoenix-pig/pom.xml
index ea83513..530236c 100644
--- a/phoenix-pig/pom.xml
+++ b/phoenix-pig/pom.xml
@@ -143,6 +143,13 @@
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <type>test-jar</type>
+      <version>${tephra.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ace160..afb6df3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
     <htrace.version>2.04</htrace.version>
     <collections.version>3.2.1</collections.version>
     <jodatime.version>2.3</jodatime.version>
+    <tephra.version>0.6.1</tephra.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>

Reply via email to