Repository: phoenix Updated Branches: refs/heads/txn 81e52e85b -> 92ee51a0d
http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 1608548..f2423ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -145,6 +145,7 @@ import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.PrimaryKeyConstraint; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.UpdateStatisticsStatement; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.ConnectionQueryServices.Feature; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -164,6 +165,7 @@ import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,7 +297,7 @@ public class MetaDataClient { MetaDataMutationResult result = updateCache(schemaName, tableName, true); return result.getMutationTime(); } - + /** * Update the cache with the latest as of the connection scn. * @param schemaName @@ -314,30 +316,50 @@ public class MetaDataClient { public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException { return updateCache(tenantId, schemaName, tableName, false); } - - private long getClientTimeStamp() { - Long scn = connection.getSCN(); - long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; - return clientTimeStamp; + + public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException { + return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null); } - + + private long getCurrentScn() { + Long scn = connection.getSCN(); + long currentScn = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + return currentScn; + } + private MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, - boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez - long clientTimeStamp = getClientTimeStamp(); + boolean alwaysHitServer, Long resolvedTimestamp) throws SQLException { // TODO: pass byte[] herez boolean systemTable = SYSTEM_CATALOG_SCHEMA.equals(schemaName); // System tables must always have a null tenantId tenantId = systemTable ? null : tenantId; PTable table = null; + PTableRef tableRef = null; String fullTableName = SchemaUtil.getTableName(schemaName, tableName); long tableTimestamp = HConstants.LATEST_TIMESTAMP; + long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP; try { - table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, fullTableName)); + tableRef = connection.getTableRef(new PTableKey(tenantId, fullTableName)); + table = tableRef.getTable(); tableTimestamp = table.getTimeStamp(); + tableResolvedTimestamp = tableRef.getResolvedTimeStamp(); } catch (TableNotFoundException e) { } - // Don't bother with server call: we can't possibly find a newer table - if (table != null && !alwaysHitServer && (systemTable || tableTimestamp == clientTimeStamp - 1)) { - return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,QueryConstants.UNSET_TIMESTAMP,table); + + boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean( + QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, + QueryServicesOptions.DEFAULT_TRANSACTIONAL); + // start a txn if all table are transactional by default or if we found the table in the cache and it is transactional + // TODO if system tables become transactional remove the check + boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional()); + if (!systemTable && isTransactional && connection.getMutationState().getTransaction()==null) + connection.getMutationState().startTransaction(); + resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp; + // Do not make rpc to getTable if + // 1. table is a system table + // 2. table was already resolved as of that timestamp + if (table != null && !alwaysHitServer + && (systemTable || resolvedTimestamp == tableResolvedTimestamp)) { + return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table); } int maxTryCount = tenantId == null ? 1 : 2; @@ -347,7 +369,12 @@ public class MetaDataClient { do { final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName); final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName); - result = connection.getQueryServices().getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, clientTimeStamp); + ConnectionQueryServices queryServices = connection.getQueryServices(); + result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, resolvedTimestamp); + // if the table was assumed to be transactional, but is actually not transactional then re-resolve as of the right timestamp (and vice versa) + if (table==null && result.getTable()!=null && result.getTable().isTransactional()!=isTransactional) { + result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, TransactionUtil.getResolvedTimestamp(connection, result.getTable().isTransactional(), HConstants.LATEST_TIMESTAMP)); + } if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) { return result; @@ -372,14 +399,19 @@ public class MetaDataClient { // server again. if (table != null) { // Ensures that table in result is set to table found in our cache. - result.setTable(table); if (code == MutationCode.TABLE_ALREADY_EXISTS) { - // Although this table is up-to-date, the parent table may not be. - // In this case, we update the parent table which may in turn pull - // in indexes to add to this table. - if (addIndexesFromPhysicalTable(result)) { - connection.addTable(result.getTable()); - } + result.setTable(table); + // Although this table is up-to-date, the parent table may not be. + // In this case, we update the parent table which may in turn pull + // in indexes to add to this table. + long resolvedTime = TransactionUtil.getResolvedTime(connection, result); + if (addIndexesFromPhysicalTable(result, resolvedTimestamp)) { + connection.addTable(result.getTable(), resolvedTime); + } + else { + // if we aren't adding the table, we still need to update the resolved time of the table + connection.updateResolvedTimestamp(table, resolvedTime); + } return result; } // If table was not found at the current time stamp and we have one cached, remove it. @@ -400,10 +432,11 @@ public class MetaDataClient { * of the table for which we just updated. * TODO: combine this round trip with the one that updates the cache for the child table. * @param result the result from updating the cache for the current table. + * @param resolvedTimestamp timestamp at which child table was resolved * @return true if the PTable contained by result was modified and false otherwise * @throws SQLException if the physical table cannot be found */ - private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result) throws SQLException { + private boolean addIndexesFromPhysicalTable(MetaDataMutationResult result, Long resolvedTimestamp) throws SQLException { PTable table = result.getTable(); // If not a view or if a view directly over an HBase table, there's nothing to do if (table.getType() != PTableType.VIEW || table.getViewType() == ViewType.MAPPED) { @@ -412,7 +445,7 @@ public class MetaDataClient { String physicalName = table.getPhysicalName().getString(); String schemaName = SchemaUtil.getSchemaNameFromFullName(physicalName); String tableName = SchemaUtil.getTableNameFromFullName(physicalName); - MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false); + MetaDataMutationResult parentResult = updateCache(null, schemaName, tableName, false, resolvedTimestamp); PTable physicalTable = parentResult.getTable(); if (physicalTable == null) { throw new TableNotFoundException(schemaName, tableName); @@ -1085,19 +1118,19 @@ public class MetaDataClient { // as there's no need to burn another sequence value. if (allocateIndexId && indexId == null) { Long scn = connection.getSCN(); - long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; PName tenantId = connection.getTenantId(); String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString(); PName physicalName = dataTable.getPhysicalName(); int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets(); SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName, nSequenceSaltBuckets); - // Create at parent timestamp as we know that will be earlier than now - // and earlier than any SCN if one is set. + // if scn is set create at scn-1, so we can see the sequence or else use latest timestamp (so that latest server time is used) + long sequenceTimestamp = scn!=null ? scn-1 : HConstants.LATEST_TIMESTAMP; createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(), true, Short.MIN_VALUE, 1, 1, false, Long.MIN_VALUE, Long.MAX_VALUE, - dataTable.getTimeStamp()); + sequenceTimestamp); long[] seqValues = new long[1]; SQLException[] sqlExceptions = new SQLException[1]; + long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; connection.getQueryServices().incrementSequences(Collections.singletonList(key), timestamp, seqValues, sqlExceptions); if (sqlExceptions[0] != null) { throw sqlExceptions[0]; @@ -1212,8 +1245,10 @@ public class MetaDataClient { boolean isImmutableRows = false; List<PName> physicalNames = Collections.emptyList(); boolean addSaltColumn = false; + Long timestamp = null; if (parent != null) { transactional = parent.isTransactional(); + timestamp = TransactionUtil.getTableTimestamp(connection, transactional, null); storeNulls = parent.getStoreNulls(); if (tableType == PTableType.INDEX) { // Index on view @@ -1246,7 +1281,7 @@ public class MetaDataClient { incrementStatement.execute(); // Get list of mutations and add to table meta data that will be passed to server // to guarantee order. This row will always end up last - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); connection.rollback(); // Add row linking from data table row to index table row @@ -1366,6 +1401,7 @@ public class MetaDataClient { .build().buildException(); } } + timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional, null) : timestamp; // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views if (statement.getTableType() == PTableType.VIEW || indexId != null) { @@ -1600,7 +1636,7 @@ public class MetaDataClient { Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, false); - connection.addTable(table); + connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } else if (tableType == PTableType.INDEX && indexId == null) { if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) { int nIndexRowKeyColumns = isPK ? 1 : pkColumnsNames.size(); @@ -1655,7 +1691,7 @@ public class MetaDataClient { addColumnMutation(schemaName, tableName, column, colUpsert, parentTableName, pkName, keySeq, saltBucketNum != null); } - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); connection.rollback(); String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString(); @@ -1699,7 +1735,7 @@ public class MetaDataClient { tableUpsert.setBoolean(20, transactional); tableUpsert.execute(); - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond()); connection.rollback(); /* @@ -1747,7 +1783,7 @@ public class MetaDataClient { default: PName newSchemaName = PNameFactory.newName(schemaName); PTable table = PTableImpl.makePTable( - tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, result.getMutationTime(), + tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(), PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, @@ -1862,6 +1898,8 @@ public class MetaDataClient { MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade); MutationCode code = result.getMutationCode(); + PTable table = result.getTable(); + boolean transactional = table!=null && table.isTransactional(); switch (code) { case TABLE_NOT_FOUND: if (!ifExists) { throw new TableNotFoundException(schemaName, tableName); } @@ -1873,12 +1911,11 @@ public class MetaDataClient { .setSchemaName(schemaName).setTableName(tableName).build().buildException(); default: - connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, - result.getMutationTime()); + connection.removeTable(tenantId, SchemaUtil.getTableName(schemaName, tableName), parentTableName, + TransactionUtil.getTableTimestamp(connection, transactional, result.getMutationTime())); if (result.getTable() != null && tableType != PTableType.VIEW) { connection.setAutoCommit(true); - PTable table = result.getTable(); boolean dropMetaData = result.getTable().getViewIndexId() == null && connection.getQueryServices().getProps().getBoolean(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA); long ts = (scn == null ? result.getMutationTime() : scn); @@ -2094,7 +2131,9 @@ public class MetaDataClient { ListMultimap<String,Pair<String,Object>> stmtProperties = statement.getProps(); Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size()); - PTable table = FromCompiler.getResolver(statement, connection).getTables().get(0).getTable(); + TableRef tableRef = FromCompiler.getResolver(statement, connection).getTables().get(0); + PTable table = tableRef.getTable(); + Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null; List<ColumnDef> columnDefs = statement.getColumnDefs(); if (columnDefs == null) { columnDefs = Collections.emptyList(); @@ -2236,7 +2275,7 @@ public class MetaDataClient { } } - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } else { // Check that HBase configured properly for mutable secondary indexing @@ -2260,13 +2299,13 @@ public class MetaDataClient { for (PTable index : table.getIndexes()) { incrementTableSeqNum(index, index.getType(), 1); } - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } long seqNum = table.getSequenceNumber(); if (changingPhoenixTableProperty || columnDefs.size() > 0) { seqNum = incrementTableSeqNum(table, statement.getTableType(), 1, isImmutableRows, disableWAL, multiTenant, storeNulls); - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } @@ -2305,7 +2344,17 @@ public class MetaDataClient { // Only update client side cache if we aren't adding a PK column to a table with indexes. // We could update the cache manually then too, it'd just be a pain. if (numPkColumnsAdded==0 || table.getIndexes().isEmpty()) { - connection.addColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName), columns, result.getMutationTime(), seqNum, isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, disableWAL == null ? table.isWALDisabled() : disableWAL, multiTenant == null ? table.isMultiTenant() : multiTenant, storeNulls == null ? table.getStoreNulls() : storeNulls); + connection.addColumn( + tenantId, + SchemaUtil.getTableName(schemaName, tableName), + columns, + result.getMutationTime(), + seqNum, + isImmutableRows == null ? table.isImmutableRows() : isImmutableRows, + disableWAL == null ? table.isWALDisabled() : disableWAL, + multiTenant == null ? table.isMultiTenant() : multiTenant, + storeNulls == null ? table.getStoreNulls() : storeNulls, + TransactionUtil.getResolvedTime(connection, result)); } // Delete rows in view index if we haven't dropped it already // We only need to do this if the multiTenant transitioned to false @@ -2436,12 +2485,12 @@ public class MetaDataClient { boolean retried = false; while (true) { final ColumnResolver resolver = FromCompiler.getResolver(statement, connection); - PTable table = resolver.getTables().get(0).getTable(); + TableRef tableRef = resolver.getTables().get(0); + PTable table = tableRef.getTable(); List<ColumnName> columnRefs = statement.getColumnRefs(); if(columnRefs == null) { columnRefs = Lists.newArrayListWithCapacity(0); } - TableRef tableRef = null; List<ColumnRef> columnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size() + table.getIndexes().size()); List<TableRef> indexesToDrop = Lists.newArrayListWithExpectedSize(table.getIndexes().size()); List<Mutation> tableMetaData = Lists.newArrayListWithExpectedSize((table.getIndexes().size() + 1) * (1 + table.getColumns().size() - columnRefs.size())); @@ -2457,30 +2506,31 @@ public class MetaDataClient { } throw e; } - tableRef = columnRef.getTableRef(); PColumn columnToDrop = columnRef.getColumn(); tableColumnsToDrop.add(columnToDrop); if (SchemaUtil.isPKColumn(columnToDrop)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_DROP_PK) .setColumnName(columnToDrop.getName().getString()).build().buildException(); } - columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition())); + columnsToDrop.add(new ColumnRef(columnRef.getTableRef(), columnToDrop.getPosition())); } dropColumnMutations(table, tableColumnsToDrop, tableMetaData); for (PTable index : table.getIndexes()) { + IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, connection); + // get the columns required for the index pk + Set<ColumnReference> indexColumns = indexMaintainer.getIndexedColumns(); + // get the covered columns + Set<ColumnReference> coveredColumns = indexMaintainer.getCoverededColumns(); List<PColumn> indexColumnsToDrop = Lists.newArrayListWithExpectedSize(columnRefs.size()); for(PColumn columnToDrop : tableColumnsToDrop) { - String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); - try { - PColumn indexColumn = index.getColumn(indexColumnName); - if (SchemaUtil.isPKColumn(indexColumn)) { - indexesToDrop.add(new TableRef(index)); - } else { - indexColumnsToDrop.add(indexColumn); - columnsToDrop.add(new ColumnRef(tableRef, columnToDrop.getPosition())); - } - } catch (ColumnNotFoundException e) { + ColumnReference columnToDropRef = new ColumnReference(columnToDrop.getFamilyName().getBytes(), columnToDrop.getName().getBytes()); + if (indexColumns.contains(columnToDropRef)) { + indexesToDrop.add(new TableRef(index)); + } + else if (coveredColumns.contains(columnToDropRef)) { + String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop); + indexColumnsToDrop.add(index.getColumn(indexColumnName)); } } if(!indexColumnsToDrop.isEmpty()) { @@ -2489,11 +2539,12 @@ public class MetaDataClient { } } - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + Long timeStamp = table.isTransactional() ? tableRef.getTimeStamp() : null; + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); long seqNum = incrementTableSeqNum(table, statement.getTableType(), -1); - tableMetaData.addAll(connection.getMutationState().toMutations().next().getSecond()); + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); // Force table header to be first in list Collections.reverse(tableMetaData); @@ -2544,7 +2595,7 @@ public class MetaDataClient { // client-side cache as it would be too painful. Just let it pull it over from // the server when needed. if (tableColumnsToDrop.size() > 0 && indexesToDrop.isEmpty()) { - connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum); + connection.removeColumn(tenantId, SchemaUtil.getTableName(schemaName, tableName) , tableColumnsToDrop, result.getMutationTime(), seqNum, TransactionUtil.getResolvedTime(connection, result)); } // If we have a VIEW, then only delete the metadata, and leave the table data alone if (table.getType() != PTableType.VIEW) { @@ -2586,7 +2637,7 @@ public class MetaDataClient { if (retried) { throw e; } - table = connection.getMetaDataCache().getTable(new PTableKey(tenantId, fullTableName)); + table = connection.getTable(new PTableKey(tenantId, fullTableName)); retried = true; } } @@ -2629,7 +2680,8 @@ public class MetaDataClient { tableUpsert.close(); } } - List<Mutation> tableMetadata = connection.getMutationState().toMutations().next().getSecond(); + Long timeStamp = indexRef.getTable().isTransactional() ? indexRef.getTimeStamp() : null; + List<Mutation> tableMetadata = connection.getMutationState().toMutations(timeStamp).next().getSecond(); connection.rollback(); MetaDataMutationResult result = connection.getQueryServices().updateIndexState(tableMetadata, dataTableName); @@ -2675,9 +2727,9 @@ public class MetaDataClient { } private PTable addTableToCache(MetaDataMutationResult result) throws SQLException { - addIndexesFromPhysicalTable(result); + addIndexesFromPhysicalTable(result, null); PTable table = result.getTable(); - connection.addTable(table); + connection.addTable(table, TransactionUtil.getResolvedTime(connection, result)); return table; } @@ -2700,13 +2752,14 @@ public class MetaDataClient { */ boolean isSharedIndex = table.getViewIndexId() != null; if (isSharedIndex) { - return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getClientTimeStamp()); + // we are assuming the stats table is not transactional + return connection.getQueryServices().getTableStats(table.getPhysicalName().getBytes(), getCurrentScn()); } boolean isView = table.getType() == PTableType.VIEW; String physicalName = table.getPhysicalName().getString(); if (isView && table.getViewType() != ViewType.MAPPED) { try { - return connection.getMetaDataCache().getTable(new PTableKey(null, physicalName)).getTableStats(); + return connection.getTable(new PTableKey(null, physicalName)).getTableStats(); } catch (TableNotFoundException e) { // Possible when the table timestamp == current timestamp - 1. // This would be most likely during the initial index build of a view index http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java index c104473..207bc2a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.schema; +import java.sql.SQLException; + import org.apache.phoenix.query.MetaDataMutated; @@ -26,6 +28,6 @@ public interface PMetaData extends MetaDataMutated, Iterable<PTable>, Cloneable } public int size(); public PMetaData clone(); - public PTable getTable(PTableKey key) throws TableNotFoundException; + public PTableRef getTableRef(PTableKey key) throws TableNotFoundException; public PMetaData pruneTables(Pruner pruner); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java index 2f84c95..8cfbb18 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java @@ -40,32 +40,12 @@ import com.google.common.primitives.Longs; * */ public class PMetaDataImpl implements PMetaData { - private static final class PTableRef { - public final PTable table; - public final int estSize; - public volatile long lastAccessTime; - - public PTableRef(PTable table, long lastAccessTime, int estSize) { - this.table = table; - this.lastAccessTime = lastAccessTime; - this.estSize = estSize; - } - - public PTableRef(PTable table, long lastAccessTime) { - this (table, lastAccessTime, table.getEstimatedSize()); - } - - public PTableRef(PTableRef tableRef) { - this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize); - } - } - private static class PTableCache implements Cloneable { private static final int MIN_REMOVAL_SIZE = 3; private static final Comparator<PTableRef> COMPARATOR = new Comparator<PTableRef>() { @Override public int compare(PTableRef tableRef1, PTableRef tableRef2) { - return Longs.compare(tableRef1.lastAccessTime, tableRef2.lastAccessTime); + return Longs.compare(tableRef1.getLastAccessTime(), tableRef2.getLastAccessTime()); } }; private static final MinMaxPriorityQueue.Builder<PTableRef> BUILDER = MinMaxPriorityQueue.orderedBy(COMPARATOR); @@ -88,7 +68,7 @@ public class PMetaDataImpl implements PMetaData { Map<PTableKey,PTableRef> newTables = newMap(Math.max(tables.size(),expectedCapacity)); // Copy value so that access time isn't changing anymore for (PTableRef tableAccess : tables.values()) { - newTables.put(tableAccess.table.getKey(), new PTableRef(tableAccess)); + newTables.put(tableAccess.getTable().getKey(), new PTableRef(tableAccess)); } return newTables; } @@ -114,7 +94,7 @@ public class PMetaDataImpl implements PMetaData { if (tableAccess == null) { return null; } - tableAccess.lastAccessTime = timeKeeper.getCurrentTime(); + tableAccess.setLastAccessTime(timeKeeper.getCurrentTime()); return tableAccess; } @@ -138,37 +118,37 @@ public class PMetaDataImpl implements PMetaData { // Add to new cache, but track references to remove when done // to bring cache at least overage amount below it's max size. for (PTableRef tableRef : tables.values()) { - newCache.put(tableRef.table.getKey(), new PTableRef(tableRef)); + newCache.put(tableRef.getTable().getKey(), new PTableRef(tableRef)); toRemove.add(tableRef); - toRemoveBytes += tableRef.estSize; - if (toRemoveBytes - toRemove.peekLast().estSize > overage) { + toRemoveBytes += tableRef.getEstSize(); + if (toRemoveBytes - toRemove.peekLast().getEstSize() > overage) { PTableRef removedRef = toRemove.removeLast(); - toRemoveBytes -= removedRef.estSize; + toRemoveBytes -= removedRef.getEstSize(); } } for (PTableRef toRemoveRef : toRemove) { - newCache.remove(toRemoveRef.table.getKey()); + newCache.remove(toRemoveRef.getTable().getKey()); } return newCache; } private PTable put(PTableKey key, PTableRef ref) { - currentByteSize += ref.estSize; + currentByteSize += ref.getEstSize(); PTableRef oldTableAccess = tables.put(key, ref); PTable oldTable = null; if (oldTableAccess != null) { - currentByteSize -= oldTableAccess.estSize; - oldTable = oldTableAccess.table; + currentByteSize -= oldTableAccess.getEstSize(); + oldTable = oldTableAccess.getTable(); } return oldTable; } - public PTable put(PTableKey key, PTable value) { - return put(key, new PTableRef(value, timeKeeper.getCurrentTime())); + public PTable put(PTableKey key, PTable value, long resolvedTime) { + return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), resolvedTime)); } - public PTable putDuplicate(PTableKey key, PTable value) { - return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0)); + public PTable putDuplicate(PTableKey key, PTable value, long resolvedTime) { + return put(key, new PTableRef(value, timeKeeper.getCurrentTime(), 0, resolvedTime)); } public PTable remove(PTableKey key) { @@ -176,8 +156,8 @@ public class PMetaDataImpl implements PMetaData { if (value == null) { return null; } - currentByteSize -= value.estSize; - return value.table; + currentByteSize -= value.getEstSize(); + return value.getTable(); } public Iterator<PTable> iterator() { @@ -191,7 +171,7 @@ public class PMetaDataImpl implements PMetaData { @Override public PTable next() { - return iterator.next().table; + return iterator.next().getTable(); } @Override @@ -235,12 +215,12 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PTable getTable(PTableKey key) throws TableNotFoundException { + public PTableRef getTableRef(PTableKey key) throws TableNotFoundException { PTableRef ref = metaData.get(key); if (ref == null) { throw new TableNotFoundException(key.getName()); } - return ref.table; + return ref; } @Override @@ -248,22 +228,29 @@ public class PMetaDataImpl implements PMetaData { return metaData.size(); } + @Override + public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { + PTableCache clone = metaData.clone(); + clone.putDuplicate(table.getKey(), table, resolvedTimestamp); + return new PMetaDataImpl(clone); + } @Override - public PMetaData addTable(PTable table) throws SQLException { + public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { int netGain = 0; PTableKey key = table.getKey(); PTableRef oldTableRef = metaData.get(key); if (oldTableRef != null) { - netGain -= oldTableRef.estSize; + netGain -= oldTableRef.getEstSize(); } PTable newParentTable = null; + long parentResolvedTimestamp = resolvedTime; if (table.getParentName() != null) { // Upsert new index table into parent data table list String parentName = table.getParentName().getString(); PTableRef oldParentRef = metaData.get(new PTableKey(table.getTenantId(), parentName)); // If parentTable isn't cached, that's ok we can skip this if (oldParentRef != null) { - List<PTable> oldIndexes = oldParentRef.table.getIndexes(); + List<PTable> oldIndexes = oldParentRef.getTable().getIndexes(); List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1); newIndexes.addAll(oldIndexes); for (int i = 0; i < newIndexes.size(); i++) { @@ -274,8 +261,8 @@ public class PMetaDataImpl implements PMetaData { } } newIndexes.add(table); - netGain -= oldParentRef.estSize; - newParentTable = PTableImpl.makePTable(oldParentRef.table, table.getTimeStamp(), newIndexes); + netGain -= oldParentRef.getEstSize(); + newParentTable = PTableImpl.makePTable(oldParentRef.getTable(), table.getTimeStamp(), newIndexes); netGain += newParentTable.getEstimatedSize(); } } @@ -286,24 +273,24 @@ public class PMetaDataImpl implements PMetaData { PTableCache tables = overage <= 0 ? metaData.clone() : metaData.cloneMinusOverage(overage); if (newParentTable != null) { // Upsert new index table into parent data table list - tables.put(newParentTable.getKey(), newParentTable); - tables.putDuplicate(table.getKey(), table); + tables.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp); + tables.putDuplicate(table.getKey(), table, resolvedTime); } else { - tables.put(table.getKey(), table); + tables.put(table.getKey(), table, resolvedTime); } for (PTable index : table.getIndexes()) { - tables.putDuplicate(index.getKey(), index); + tables.putDuplicate(index.getKey(), index, resolvedTime); } return new PMetaDataImpl(tables); } @Override - public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException { + public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException { PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName)); if (oldTableRef == null) { return this; } - List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.table); + List<PColumn> oldColumns = PTableImpl.getColumnsToClone(oldTableRef.getTable()); List<PColumn> newColumns; if (columnsToAdd.isEmpty()) { newColumns = oldColumns; @@ -312,8 +299,8 @@ public class PMetaDataImpl implements PMetaData { newColumns.addAll(oldColumns); newColumns.addAll(columnsToAdd); } - PTable newTable = PTableImpl.makePTable(oldTableRef.table, tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls); - return addTable(newTable); + PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls); + return addTable(newTable, resolvedTime); } @Override @@ -340,7 +327,7 @@ public class PMetaDataImpl implements PMetaData { } // also remove its reference from parent table if (parentTableRef != null) { - List<PTable> oldIndexes = parentTableRef.table.getIndexes(); + List<PTable> oldIndexes = parentTableRef.getTable().getIndexes(); if(oldIndexes != null && !oldIndexes.isEmpty()) { List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size()); newIndexes.addAll(oldIndexes); @@ -349,13 +336,13 @@ public class PMetaDataImpl implements PMetaData { if (index.getName().getString().equals(tableName)) { newIndexes.remove(i); PTable parentTable = PTableImpl.makePTable( - parentTableRef.table, - tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.table.getTimeStamp() : tableTimeStamp, + parentTableRef.getTable(), + tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp, newIndexes); if (tables == null) { tables = metaData.clone(); } - tables.put(parentTable.getKey(), parentTable); + tables.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp()); break; } } @@ -365,12 +352,12 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum) throws SQLException { + public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName)); if (tableRef == null) { return this; } - PTable table = tableRef.table; + PTable table = tableRef.getTable(); PTableCache tables = metaData.clone(); for (PColumn columnToRemove : columnsToRemove) { PColumn column; @@ -399,7 +386,7 @@ public class PMetaDataImpl implements PMetaData { table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns); } - tables.put(table.getKey(), table); + tables.put(table.getKey(), table, resolvedTime); return new PMetaDataImpl(tables); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java new file mode 100644 index 0000000..83d0b42 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableRef.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +public class PTableRef { + private final PTable table; + private final int estSize; + private volatile long lastAccessTime; + // timestamp (scn or txn timestamp at which rpc to fetch the table was made) + private long resolvedTimeStamp; + + public PTableRef(PTable table, long lastAccessTime, int estSize, long resolvedTime) { + this.table = table; + this.lastAccessTime = lastAccessTime; + this.estSize = estSize; + this.resolvedTimeStamp = resolvedTime; + } + + public PTableRef(PTable table, long lastAccessTime, long resolvedTime) { + this (table, lastAccessTime, table.getEstimatedSize(), resolvedTime); + } + + public PTableRef(PTableRef tableRef) { + this (tableRef.table, tableRef.lastAccessTime, tableRef.estSize, tableRef.resolvedTimeStamp); + } + + public PTable getTable() { + return table; + } + + public long getResolvedTimeStamp() { + return resolvedTimeStamp; + } + + public int getEstSize() { + return estSize; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void setLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + public void setResolvedTimeStamp(long resolvedTimeStamp) { + this.resolvedTimeStamp = resolvedTimeStamp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java index ec3e64b..316b6ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java @@ -113,7 +113,7 @@ public class TableRef { if (obj == null) return false; if (getClass() != obj.getClass()) return false; TableRef other = (TableRef)obj; - // FIXME: a null alias on either side should mean a wildcard and should not fail the equals check + // a null alias on either side should mean a wildcard and should not fail the equals check if ((alias == null && other.alias != null) || (alias != null && !alias.equals(other.alias))) return false; if (!table.getName().getString().equals(other.table.getName().getString())) return false; return true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index b030510..4fdd597 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -268,7 +268,7 @@ public class PhoenixRuntime { */ public static Iterator<Pair<byte[],List<KeyValue>>> getUncommittedDataIterator(Connection conn, boolean includeMutableIndexes) throws SQLException { final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes); + final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(includeMutableIndexes, null); return new Iterator<Pair<byte[],List<KeyValue>>>() { @Override @@ -304,7 +304,7 @@ public class PhoenixRuntime { PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); try { name = SchemaUtil.normalizeIdentifier(name); - table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), name)); + table = pconn.getTable(new PTableKey(pconn.getTenantId(), name)); } catch (TableNotFoundException e) { String schemaName = SchemaUtil.getSchemaNameFromFullName(name); String tableName = SchemaUtil.getTableNameFromFullName(name); http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 47db678..4dc792f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -489,15 +489,10 @@ public class SchemaUtil { } protected static PhoenixConnection addMetaDataColumn(PhoenixConnection conn, long scn, String columnDef) throws SQLException { - String url = conn.getURL(); - Properties props = conn.getClientInfo(); - PMetaData metaData = conn.getMetaDataCache(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); PhoenixConnection metaConnection = null; - Statement stmt = null; try { - metaConnection = new PhoenixConnection(conn.getQueryServices(), url, props, metaData); + metaConnection = new PhoenixConnection(conn.getQueryServices(), conn, scn); try { stmt = metaConnection.createStatement(); stmt.executeUpdate("ALTER TABLE SYSTEM.\"TABLE\" ADD IF NOT EXISTS " + columnDef); http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index f013244..b0a8c9b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -27,9 +27,13 @@ import co.cask.tephra.TransactionFailureException; import co.cask.tephra.TxConstants; import co.cask.tephra.hbase98.TransactionAwareHTable; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; public class TransactionUtil { @@ -38,10 +42,14 @@ public class TransactionUtil { private static final TransactionCodec codec = new TransactionCodec(); - public static long translateMillis(long serverTimeStamp) { + public static long convertToNanoseconds(long serverTimeStamp) { return serverTimeStamp * 1000000; } + public static long convertToMillisecods(Long serverTimeStamp) { + return serverTimeStamp / 1000000; + } + public static byte[] encodeTxnState(Transaction txn) throws SQLException { try { return codec.encode(txn); @@ -72,4 +80,34 @@ public class TransactionUtil { // Conflict detection is not needed for tables with write-once/append-only data return new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); } + + public static long getResolvedTimestamp(PhoenixConnection connection, boolean isTransactional, Long defaultResolvedTimestamp) { + Transaction transaction = connection.getMutationState().getTransaction(); + Long scn = connection.getSCN(); + return scn != null ? scn : (isTransactional && transaction!=null) ? convertToMillisecods(transaction.getReadPointer()) : defaultResolvedTimestamp; + } + + public static long getResolvedTime(PhoenixConnection connection, MetaDataMutationResult result) { + PTable table = result.getTable(); + boolean isTransactional = table!=null && table.isTransactional(); + return getResolvedTimestamp(connection, isTransactional, result.getMutationTime()); + } + + public static long getTableTimestamp(PhoenixConnection connection, MetaDataMutationResult result) { + PTable table = result.getTable(); + Transaction transaction = connection.getMutationState().getTransaction(); + boolean transactional = table!=null && table.isTransactional(); + return (transactional && transaction!=null) ? convertToMillisecods(transaction.getReadPointer()) : result.getMutationTime(); + } + + public static Long getTableTimestamp(PhoenixConnection connection, boolean transactional, Long mutationTime) throws SQLException { + Long timestamp = mutationTime; + MutationState mutationState = connection.getMutationState(); + if (transactional && mutationState.getTransaction()==null && connection.getSCN()==null) { + mutationState.startTransaction(); + timestamp = convertToMillisecods(mutationState.getTransaction().getReadPointer()); + connection.commit(); + } + return timestamp; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 7c36245..fbaa01e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -151,7 +151,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { String query = "CREATE TABLE t1 (k integer not null primary key, a.k decimal, b.k decimal)"; conn.createStatement().execute(query); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PColumn c = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), "T1")).getColumn("K"); + PColumn c = pconn.getTable(new PTableKey(pconn.getTenantId(), "T1")).getColumn("K"); assertTrue(SchemaUtil.isPKColumn(c)); } finally { conn.close(); @@ -1162,7 +1162,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException { PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - assertEquals(expectedValue, pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows()); + assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows()); } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java index 7a0bac6..5b457e7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/ViewCompilerTest.java @@ -31,20 +31,20 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ViewType; -import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; public class ViewCompilerTest extends BaseConnectionlessQueryTest { @Test public void testViewTypeCalculation() throws Exception { - assertViewType(new String[] { + assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] { "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 = 1 AND k2 = 'foo'", "CREATE VIEW v2 AS SELECT * FROM t WHERE k2 = 'foo'", "CREATE VIEW v3 AS SELECT * FROM t WHERE v = 'bar'||'bas'", "CREATE VIEW v4 AS SELECT * FROM t WHERE 'bar'=v and 5+3/2 = k1", }, ViewType.UPDATABLE); - assertViewType(new String[] { + assertViewType(new String[] {"V1","V2","V3","V4"}, new String[] { "CREATE VIEW v1 AS SELECT * FROM t WHERE k1 < 1 AND k2 = 'foo'", "CREATE VIEW v2 AS SELECT * FROM t WHERE substr(k2,0,3) = 'foo'", "CREATE VIEW v3 AS SELECT * FROM t WHERE v = TO_CHAR(CURRENT_DATE())", @@ -52,28 +52,27 @@ public class ViewCompilerTest extends BaseConnectionlessQueryTest { }, ViewType.READ_ONLY); } - public void assertViewType(String[] views, ViewType viewType) throws Exception { + public void assertViewType(String[] viewNames, String[] viewDDLs, ViewType viewType) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); String ct = "CREATE TABLE t (k1 INTEGER NOT NULL, k2 VARCHAR, v VARCHAR, CONSTRAINT pk PRIMARY KEY (k1,k2))"; conn.createStatement().execute(ct); - for (String view : views) { - conn.createStatement().execute(view); + for (String viewDDL : viewDDLs) { + conn.createStatement().execute(viewDDL); } StringBuilder buf = new StringBuilder(); int count = 0; - for (PTable table : conn.getMetaDataCache()) { - if (table.getType() == PTableType.VIEW) { - assertEquals(viewType, table.getViewType()); - conn.createStatement().execute("DROP VIEW " + table.getName().getString()); - buf.append(' '); - buf.append(table.getName().getString()); - count++; - } + for (String view : viewNames) { + PTable table = conn.getTable(new PTableKey(null, view)); + assertEquals(viewType, table.getViewType()); + conn.createStatement().execute("DROP VIEW " + table.getName().getString()); + buf.append(' '); + buf.append(table.getName().getString()); + count++; } - assertEquals("Expected " + views.length + ", but got " + count + ":"+ buf.toString(), views.length, count); + assertEquals("Expected " + viewDDLs.length + ", but got " + count + ":"+ buf.toString(), viewDDLs.length, count); } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java index 29e14bf..cfd76bc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.SortedMap; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.end2end.Shadower; @@ -646,7 +647,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest { } stmt.execute(); - final PTable table = conn.unwrap(PhoenixConnection.class).getMetaDataCache().getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC")); + final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC")); GuidePostsInfo info = new GuidePostsInfo(0,Collections.<byte[]> emptyList(), 0l); for (byte[] gp : guidePosts) { info.addGuidePost(gp, 1000); @@ -670,7 +671,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest { return table.getTimeStamp()+1; } }); - conn.unwrap(PhoenixConnection.class).addTable(tableWithStats); + conn.unwrap(PhoenixConnection.class).addTable(tableWithStats, System.currentTimeMillis()); String query = "SELECT count(1) cnt,\n" + " coalesce(SUM(impressions), 0.0) AS \"impressions\",\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 6e1c28f..e941ceb 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -100,8 +100,8 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { try { conn.createStatement().execute("CREATE INDEX idx ON " + fullTableName + "(" + indexColumns + ") " + (includeColumns.isEmpty() ? "" : "INCLUDE (" + includeColumns + ") ") + (indexProps.isEmpty() ? "" : indexProps)); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)); - PTable index = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(),fullIndexName)); + PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)); + PTable index = pconn.getTable(new PTableKey(pconn.getTenantId(),fullIndexName)); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); table.getIndexMaintainers(ptr, pconn); List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder); http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java index abaaeb5..452ea4d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java @@ -123,7 +123,7 @@ public class BaseConnectionlessQueryTest extends BaseTest { props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP)); PhoenixConnection conn = DriverManager.getConnection(PHOENIX_CONNECTIONLESS_JDBC_URL, props).unwrap(PhoenixConnection.class); try { - PTable table = conn.getMetaDataCache().getTable(new PTableKey(null, ATABLE_NAME)); + PTable table = conn.getTable(new PTableKey(null, ATABLE_NAME)); ATABLE = table; ORGANIZATION_ID = new ColumnRef(new TableRef(table), table.getColumn("ORGANIZATION_ID").getPosition()).newColumnExpression(); ENTITY_ID = new ColumnRef(new TableRef(table), table.getColumn("ENTITY_ID").getPosition()).newColumnExpression(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 3d6cb0c..da965c8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -142,6 +142,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.internal.utils.Networks; import org.apache.twill.zookeeper.RetryStrategies; import org.apache.twill.zookeeper.ZKClientService; import org.apache.twill.zookeeper.ZKClientServices; @@ -498,10 +499,9 @@ public abstract class BaseTest { config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 600); -// config.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, ConnectionInfo.getZookeeperConnectionString(getUrl())); -// config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, "/tmp"); ConnectionInfo connInfo = ConnectionInfo.create(getUrl()); zkClient = ZKClientServices.delegate( http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index cee1054..21e1f62 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -105,7 +105,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { Connection conn = DriverManager.getConnection(getUrl(), props); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)); + PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), TABLE_NAME)); TableRef tableRef = new TableRef(table); List<HRegionLocation> regions = pconn.getQueryServices().getAllTableRegions(tableRef.getTable().getPhysicalName().getBytes()); List<KeyRange> ranges = getSplits(tableRef, scan, regions, scanRanges); http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java index 9379ef3..405a73c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/PMetaDataImplTest.java @@ -32,7 +32,7 @@ public class PMetaDataImplTest { private static PMetaData addToTable(PMetaData metaData, String name, int size) throws SQLException { PTable table = new PSizedTable(new PTableKey(null,name), size); - return metaData.addTable(table); + return metaData.addTable(table, System.currentTimeMillis()); } private static PMetaData removeFromTable(PMetaData metaData, String name) throws SQLException { @@ -40,7 +40,7 @@ public class PMetaDataImplTest { } private static PTable getFromTable(PMetaData metaData, String name) throws TableNotFoundException { - return metaData.getTable(new PTableKey(null,name)); + return metaData.getTableRef(new PTableKey(null,name)).getTable(); } private static void assertNames(PMetaData metaData, String... names) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java index bcd08f0..6977103 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeySchemaTest.java @@ -56,7 +56,7 @@ public class RowKeySchemaTest extends BaseConnectionlessQueryTest { String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName)); conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")) " + (dataProps.isEmpty() ? "" : dataProps) ); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)); + PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)); conn.close(); StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES("); for (int i = 0; i < values.length; i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java index 23ec4bf..7ab72d6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/RowKeyValueAccessorTest.java @@ -52,7 +52,7 @@ public class RowKeyValueAccessorTest extends BaseConnectionlessQueryTest { String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(schemaName),SchemaUtil.normalizeIdentifier(tableName)); conn.createStatement().execute("CREATE TABLE " + fullTableName + "(" + dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")) " + (dataProps.isEmpty() ? "" : dataProps) ); PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getMetaDataCache().getTable(new PTableKey(pconn.getTenantId(), fullTableName)); + PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)); conn.close(); StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES("); for (int i = 0; i < values.length; i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 9fbb8c9..bead2df 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -25,14 +25,17 @@ import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PAR import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -567,5 +570,27 @@ public class TestUtil { analyzeTable(conn, tableName); conn.close(); } + + public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException { + // insert row + stmt.setString(1, "varchar" + String.valueOf(i)); + stmt.setString(2, "char" + String.valueOf(i)); + stmt.setInt(3, i); + stmt.setLong(4, i); + stmt.setBigDecimal(5, new BigDecimal(i*0.5d)); + Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY); + stmt.setDate(6, date); + } + + public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException { + assertTrue(rs.next()); + assertEquals(rs.getString(1), "varchar" + String.valueOf(i)); + assertEquals(rs.getString(2), "char" + String.valueOf(i)); + assertEquals(rs.getInt(3), i); + assertEquals(rs.getInt(4), i); + assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d)); + Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY); + assertEquals(rs.getDate(6), date); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-flume/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml index 731955e..6c248b1 100644 --- a/phoenix-flume/pom.xml +++ b/phoenix-flume/pom.xml @@ -165,6 +165,13 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> </dependency> + <dependency> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-core</artifactId> + <type>test-jar</type> + <version>${tephra.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/phoenix-pig/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-pig/pom.xml b/phoenix-pig/pom.xml index ea83513..530236c 100644 --- a/phoenix-pig/pom.xml +++ b/phoenix-pig/pom.xml @@ -143,6 +143,13 @@ <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> + <dependency> + <groupId>co.cask.tephra</groupId> + <artifactId>tephra-core</artifactId> + <type>test-jar</type> + <version>${tephra.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/phoenix/blob/92ee51a0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9ace160..afb6df3 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ <htrace.version>2.04</htrace.version> <collections.version>3.2.1</collections.version> <jodatime.version>2.3</jodatime.version> + <tephra.version>0.6.1</tephra.version> <!-- Test Dependencies --> <mockito-all.version>1.8.5</mockito-all.version>
