Repository: phoenix Updated Branches: refs/heads/encodecolumns 3909c633c -> c7a425df4
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 99ba25b..54e2861 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -33,7 +33,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; @@ -41,6 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_COUNTER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; @@ -113,6 +114,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -121,11 +123,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; @@ -209,6 +213,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -273,6 +278,13 @@ public class MetaDataClient { LINK_TYPE + "," + PARENT_TENANT_ID + " " + PVarchar.INSTANCE.getSqlTypeName() + // Dynamic column for now to prevent schema change ") VALUES (?, ?, ?, ?, ?, ?)"; + private static final String UPDATE_ENCODED_COLUMN_COUNT = + "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + + TABLE_SCHEM + "," + + TABLE_NAME + "," + + COLUMN_FAMILY + "," + + ENCODED_COLUMN_QUALIFIER_COUNTER + + ") VALUES (?, ?, ?, ?)"; private static final String INCREMENT_SEQ_NUM = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TENANT_ID + "," + @@ -326,7 +338,7 @@ public class MetaDataClient { PK_NAME + "," + // write this both in the column and table rows for access by metadata APIs KEY_SEQ + "," + COLUMN_DEF + "," + - COLUMN_QUALIFIER + "," + + ENCODED_COLUMN_QUALIFIER + "," + IS_ROW_TIMESTAMP + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private static final String INSERT_COLUMN_ALTER_TABLE = @@ -349,7 +361,7 @@ public class MetaDataClient { PK_NAME + "," + // write this both in the column and table rows for access by metadata APIs KEY_SEQ + "," + COLUMN_DEF + ", " + - COLUMN_QUALIFIER + + ENCODED_COLUMN_QUALIFIER + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private static final String UPDATE_COLUMN_POSITION = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\" ( " + @@ -779,7 +791,7 @@ public class MetaDataClient { argUpsert.execute(); } - private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, Map<String, Integer> nextColumnQualifiers) throws SQLException { + private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, Map<String, Integer> nextEncodedColumnQualifiers) throws SQLException { try { ColumnName columnDefName = def.getColumnDefName(); SortOrder sortOrder = def.getSortOrder(); @@ -828,13 +840,13 @@ public class MetaDataClient { isNull = false; } Integer columnQualifier = null; - if (!isPK && nextColumnQualifiers != null) { - columnQualifier = nextColumnQualifiers.get(familyName.getString()); + if (!isPK && nextEncodedColumnQualifiers != null) { + columnQualifier = nextEncodedColumnQualifiers.get(familyName.getString()); if (columnQualifier == null) { // We use columnQualifier 0 for the special empty key value. columnQualifier = 1; } - nextColumnQualifiers.put(familyName.toString(), columnQualifier + 1); + nextEncodedColumnQualifiers.put(familyName.toString(), columnQualifier + 1); } PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(), def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifier); @@ -1823,7 +1835,7 @@ public class MetaDataClient { columns = newArrayListWithExpectedSize(colDefs.size()); pkColumns = newLinkedHashSetWithExpectedSize(colDefs.size() + 1); // in case salted } - + // Don't add link for mapped view, as it just points back to itself and causes the drop to // fail because it looks like there's always a view associated with it. if (!physicalNames.isEmpty()) { @@ -1863,33 +1875,72 @@ public class MetaDataClient { int pkPositionOffset = pkColumns.size(); int position = positionOffset; - StorageScheme storageScheme = null; - Map<String, Integer> nextColumnQualifiers = null; // this would be null for tables created for columns with storage scheme != ENCODED_COLUMN_NAMES + StorageScheme storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; + Map<String, Integer> nextCQCounters = null; + Map<String, Integer> updatedPhysicalTableCQCounters = null; + PTable viewPhysicalTable = null; + //TODO: samarth what about local indexes. if (SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) { // System tables have hard-coded column qualifiers. So we can't use column encoding for them. storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; - } else if (parent != null && tableType == PTableType.VIEW) { - // We can't control what column qualifiers are used in HTable mapped to Phoenix views. So - // we are not able to encode column names. - if (viewType != null && viewType == MAPPED) { + } else if (tableType == PTableType.VIEW) { + /* + * We can't control what column qualifiers are used in HTable mapped to Phoenix views. So we are not + * able to encode column names. + */ + if (viewType == MAPPED) { storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; } else { - // for regular phoenix views, use the storage scheme of the parent since they all share the parent's - // HTable. - storageScheme = parent.getStorageScheme(); + /* + * For regular phoenix views, use the storage scheme of the physical table since they all share the + * the same HTable. Views always use the base table's column qualifier counters for doling out + * encoded column qualifiers. + */ + viewPhysicalTable = connection.getTable(new PTableKey(null, physicalNames.get(0).getString())); + storageScheme = viewPhysicalTable.getStorageScheme(); if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) { - nextColumnQualifiers = SchemaUtil.getNextEncodedColumnQualifiers(parent); + nextCQCounters = viewPhysicalTable.getEncodedCQCounters(); + updatedPhysicalTableCQCounters = Maps.newHashMapWithExpectedSize(colDefs.size()); } } } else { - // New indexes on existing tables can have encoded column names. But unfortunately, due to - // backward compatibility reasons, we aren't able to change IndexMaintainer and the state - // that is serialized in it. Because of this we are forced to have the indexes inherit the - // storage scheme of the parent data tables. Otherwise, we always attempt to create tables - // with encoded column names. - storageScheme = parent != null ? parent.getStorageScheme() : StorageScheme.ENCODED_COLUMN_NAMES; + /* + * New indexes on existing tables can have encoded column names. But unfortunately, due to backward + * compatibility reasons, we aren't able to change IndexMaintainer and the state that is serialized in + * it. Because of this we are forced to have the indexes inherit the storage scheme of the parent data + * tables. Otherwise, we always attempt to create tables with encoded column names. + * + * Also of note is the case with shared indexes i.e. local indexes and view indexes. In these cases, + * column qualifiers for covered columns don't have to be unique because rows of the logical indexes are + * partitioned by the virtue of indexId present in the row key. As such, different shared indexes can use + * potentially overlapping column qualifiers. + * + * If the hbase table already exists, then possibly encoded or non-encoded column qualifiers already exist. + * In this case we pursue ahead with non-encoded column qualifier scheme. If the phoenix table already exists + * then we rely on the PTable, with appropriate storage scheme, returned in the MetadataMutationResult to be updated + * in the client cache. If it doesn't then the non-encoded column qualifier scheme works because we cannot control + * the column qualifiers that were used when populating the hbase table. + */ + byte[] tableNameBytes = SchemaUtil.getTableNameAsBytes(schemaName, tableName); + boolean hbaseTableAlreadyExists = true; + try (HBaseAdmin admin = connection.getQueryServices().getAdmin()) { + try { + admin.getTableDescriptor(tableNameBytes); + } catch (org.apache.hadoop.hbase.TableNotFoundException e) { + hbaseTableAlreadyExists = false; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (parent != null) { + storageScheme = parent.getStorageScheme(); + } else if (hbaseTableAlreadyExists) { + storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; + } else { + storageScheme = StorageScheme.ENCODED_COLUMN_NAMES; + } if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) { - nextColumnQualifiers = Maps.newHashMapWithExpectedSize(colDefs.size() - pkColumns.size()); + nextCQCounters = Maps.newHashMapWithExpectedSize(colDefs.size() - pkColumns.size()); } } @@ -1911,7 +1962,11 @@ public class MetaDataClient { .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException(); } } - PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, nextColumnQualifiers); + PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, nextCQCounters); + String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null; + if (updatedPhysicalTableCQCounters != null && cf != null && EncodedColumnsUtil.hasEncodedColumnName(column)) { + updatedPhysicalTableCQCounters.put(cf, nextCQCounters.get(cf)); + } if (SchemaUtil.isPKColumn(column)) { // TODO: remove this constraint? if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) { @@ -1943,9 +1998,45 @@ public class MetaDataClient { .build().buildException(); } if (column.getFamilyName() != null) { - familyNames.put(column.getFamilyName().getString(),column.getFamilyName()); + familyNames.put(cf,column.getFamilyName()); + } + } + + if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) { + // Store the encoded column counter for each column family for phoenix entities that have their own hbase + // tables i.e. base tables and indexes. + Map<String, Integer> mapToUse = tableType == VIEW ? updatedPhysicalTableCQCounters : nextCQCounters; + if (tableType != VIEW && nextCQCounters.isEmpty()) { + // Case when a table or index has only pk columns. + nextCQCounters.put(defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultFamilyName, 1); + } + if (mapToUse != null) { + PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNT); + String schemaNameToUse = tableType == VIEW ? viewPhysicalTable.getSchemaName().getString() : schemaName; + String tableNameToUse = tableType == VIEW ? viewPhysicalTable.getTableName().getString() : tableName; + for (Entry<String, Integer> entry : mapToUse.entrySet()) { + String familyName = entry.getKey(); + Integer nextQualifier = entry.getValue(); + linkStatement.setString(1, schemaNameToUse); + linkStatement.setString(2, tableNameToUse); + linkStatement.setString(3, familyName); + linkStatement.setInt(4, nextQualifier); + linkStatement.execute(); + } + + // When a view adds its own columns, then we need to increase the sequence number of the base table + // too since we want clients to get the latest PTable of the base table. + if (tableType == VIEW && updatedPhysicalTableCQCounters != null && !updatedPhysicalTableCQCounters.isEmpty()) { + PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); + incrementStatement.setString(1, null); + incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString()); + incrementStatement.setString(3, viewPhysicalTable.getTableName().getString()); + incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1); + incrementStatement.execute(); + } } } + // We need a PK definition for a TABLE or mapped VIEW if (!isPK && pkColumnsNames.isEmpty() && tableType != PTableType.VIEW && viewType != ViewType.MAPPED) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING) @@ -2029,7 +2120,7 @@ public class MetaDataClient { Collections.<PTable>emptyList(), isImmutableRows, Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, - Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, StorageScheme.NON_ENCODED_COLUMN_NAMES); + Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, StorageScheme.NON_ENCODED_COLUMN_NAMES, ImmutableMap.<String, Integer>of()); connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } else if (tableType == PTableType.INDEX && indexId == null) { if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) { @@ -2194,12 +2285,14 @@ public class MetaDataClient { throw new ConcurrentTableMutationException(schemaName, tableName); default: PName newSchemaName = PNameFactory.newName(schemaName); + // Views always rely on the parent table's map to dole out encoded column qualifiers. + Map<String, Integer> qualifierMapToBe = tableType == PTableType.VIEW ? ImmutableMap.<String, Integer>of() : nextCQCounters; PTable table = PTableImpl.makePTable( tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(), PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns, dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, - indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, storageScheme); + indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, storageScheme, qualifierMapToBe); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); return table; @@ -2752,8 +2845,16 @@ public class MetaDataClient { List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnDefs.size()); Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>(); Set<String> families = new LinkedHashSet<>(); - Map<String, Integer> nextColumnQualifiers = SchemaUtil.getNextEncodedColumnQualifiers(table); + PTableType tableType = table.getType(); + PTable tableForCQCounters = null; + Map<String, Integer> cqCountersToUse = null; + Map<String, Integer> cfWithUpdatedCQCounters = null; if (columnDefs.size() > 0 ) { + //FIXME: samarth change this to fetch table from server if client cache doesn't have it. What about local indexes? + //TODO: samarth should these be guarded by storage scheme check. Better to have the map always available. immutable empty for views and non encoded. + tableForCQCounters = tableType == PTableType.VIEW ? connection.getTable(new PTableKey(null, table.getPhysicalName().getString())) : table; + cqCountersToUse = tableForCQCounters.getEncodedCQCounters(); + cfWithUpdatedCQCounters = cqCountersToUse != null ? Maps.<String, Integer>newHashMapWithExpectedSize(columnDefs.size()) : null; try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE)) { short nextKeySeq = SchemaUtil.getMaxKeySeq(table); for( ColumnDef colDef : columnDefs) { @@ -2773,11 +2874,15 @@ public class MetaDataClient { throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY) .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException(); } - PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, nextColumnQualifiers); + PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, cqCountersToUse); + String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null; + if (cfWithUpdatedCQCounters != null && cf != null && EncodedColumnsUtil.hasEncodedColumnName(column)) { + cfWithUpdatedCQCounters.put(cf, cqCountersToUse.get(cf)); + } columns.add(column); String pkName = null; Short keySeq = null; - + // TODO: support setting properties on other families? if (column.getFamilyName() == null) { ++numPkColumnsAdded; @@ -2789,7 +2894,7 @@ public class MetaDataClient { colFamiliesForPColumnsToBeAdded.add(column.getFamilyName() == null ? null : column.getFamilyName().getString()); addColumnMutation(schemaName, tableName, column, colUpsert, null, pkName, keySeq, table.getBucketNum() != null); } - + // Add any new PK columns to end of index PK if (numPkColumnsAdded>0) { // create PK column list that includes the newly created columns @@ -2811,7 +2916,7 @@ public class MetaDataClient { ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName())); Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, ++pkSlotPosition)); ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp()); - PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, nextColumnQualifiers); + PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, cqCountersToUse); addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null); } } @@ -2847,6 +2952,7 @@ public class MetaDataClient { tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); connection.rollback(); } + long seqNum = table.getSequenceNumber(); if (changingPhoenixTableProperty || columnDefs.size() > 0) { seqNum = incrementTableSeqNum(table, statement.getTableType(), columnDefs.size(), isTransactional, updateCacheFrequency, isImmutableRows, disableWAL, multiTenant, storeNulls); @@ -2858,6 +2964,31 @@ public class MetaDataClient { Collections.reverse(tableMetaData); // Add column metadata afterwards, maintaining the order so columns have more predictable ordinal position tableMetaData.addAll(columnMetaData); + //TODO: samarth I am not sure this is going to work on server side. But for now lets add these mutations here. + if (cfWithUpdatedCQCounters != null && !cfWithUpdatedCQCounters.isEmpty()) { + PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNT); + for (Entry<String, Integer> entry : cfWithUpdatedCQCounters.entrySet()) { + String familyName = entry.getKey(); + Integer nextQualifier = entry.getValue(); + linkStatement.setString(1, tableForCQCounters.getSchemaName().getString()); + linkStatement.setString(2, tableForCQCounters.getTableName().getString()); + linkStatement.setString(3, familyName); + linkStatement.setInt(4, nextQualifier); + linkStatement.execute(); + } + // When a view adds its own columns, then we need to increase the sequence number of the base table + // too since we want clients to get the latest PTable of the base table. + if (tableType == VIEW) { + PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM); + incrementStatement.setString(1, null); //TODO: samarth verify that tenant id should be null here + incrementStatement.setString(2, tableForCQCounters.getSchemaName().getString()); + incrementStatement.setString(3, tableForCQCounters.getTableName().getString()); + incrementStatement.setLong(4, tableForCQCounters.getSequenceNumber() + 1); + incrementStatement.execute(); + } + tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond()); + connection.rollback(); + } byte[] family = families.size() > 0 ? families.iterator().next().getBytes() : null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index f54f87e..c6c66e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -18,6 +18,7 @@ package org.apache.phoenix.schema; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -130,7 +131,7 @@ public interface PTable extends PMetaDataEntity { * Link from a view to its parent table */ PARENT_TABLE((byte)3); - + private final byte[] byteValue; private final byte serializedValue; @@ -379,4 +380,5 @@ public interface PTable extends PMetaDataEntity { int getRowTimestampColPos(); long getUpdateCacheFrequency(); StorageScheme getStorageScheme(); + Map<String, Integer> getEncodedCQCounters(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index e517fcf..d8b76c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos; import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos.PGuidePosts; import org.apache.phoenix.coprocessor.generated.PTableProtos; +import org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedColumnQualifierCounter; import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -143,6 +145,7 @@ public class PTableImpl implements PTable { private int rowTimestampColPos; private long updateCacheFrequency; private StorageScheme storageScheme; + private Map<String, Integer> encodedCQCounters; public PTableImpl() { this.indexes = Collections.emptyList(); @@ -213,7 +216,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(), indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException { @@ -222,7 +225,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException { @@ -231,7 +234,7 @@ public class PTableImpl implements PTable { sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException { @@ -240,7 +243,7 @@ public class PTableImpl implements PTable { sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, @@ -250,7 +253,7 @@ public class PTableImpl implements PTable { sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException { @@ -260,7 +263,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException { @@ -270,7 +273,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), - table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException { @@ -280,7 +283,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats, - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme()); + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme(), table.getEncodedCQCounters()); } public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, @@ -289,12 +292,12 @@ public class PTableImpl implements PTable { boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, - long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException { + long indexDisableTimestamp, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional, - updateCacheFrequency,indexDisableTimestamp, storageScheme); + updateCacheFrequency,indexDisableTimestamp, storageScheme, encodedColumnQualifierCounters); } public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, @@ -303,12 +306,13 @@ public class PTableImpl implements PTable { boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, - @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp, StorageScheme storageScheme) + @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp, StorageScheme storageScheme, + Map<String, Integer> encodedColumnQualifierCounters) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, - indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme); + indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme, encodedColumnQualifierCounters); } private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, @@ -316,11 +320,11 @@ public class PTableImpl implements PTable { PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, - PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException { + PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException { init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, - isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme); + isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme, encodedColumnQualifierCounters); } @Override @@ -353,7 +357,7 @@ public class PTableImpl implements PTable { PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, - IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException { + IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme, Map<String, Integer> encodedCQCounters) throws SQLException { Preconditions.checkNotNull(schemaName); Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE + @@ -525,9 +529,9 @@ public class PTableImpl implements PTable { for (PName name : this.physicalNames) { estimatedSize += name.getEstimatedSize(); } - this.estimatedSize = estimatedSize; this.baseColumnCount = baseColumnCount; + this.encodedCQCounters = encodedCQCounters; } @Override @@ -1042,7 +1046,8 @@ public class PTableImpl implements PTable { public IndexType getIndexType() { return indexType; } - + + //FIXME: samarth change the proto here /** * Construct a PTable instance from ProtoBuffered PTable instance * @param table @@ -1142,13 +1147,22 @@ public class PTableImpl implements PTable { if (table.hasStorageScheme()) { storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]); } + int numCounters = table.getEncodedColumnQualifierCountersCount(); + Map<String, Integer> encodedColumnQualifierCounters = null; + if (numCounters > 0) { + encodedColumnQualifierCounters = Maps.newHashMapWithExpectedSize(numCounters); + for (int i = 0; i < numCounters; i++) { + PTableProtos.EncodedColumnQualifierCounter c = table.getEncodedColumnQualifierCounters(i); + encodedColumnQualifierCounters.put(c.getFamilyName(), c.getCounter()); + } + } try { PTableImpl result = new PTableImpl(); result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName, (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, - isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme); + isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme, encodedColumnQualifierCounters); return result; } catch (SQLException e) { throw new RuntimeException(e); // Impossible @@ -1242,6 +1256,15 @@ public class PTableImpl implements PTable { if (table.getStorageScheme() != null) { builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedValue()})); } + Map<String, Integer> encodedColumnQualifierCounters = table.getEncodedCQCounters(); + if (encodedColumnQualifierCounters != null) { + for (Entry<String, Integer> entry : encodedColumnQualifierCounters.entrySet()) { + EncodedColumnQualifierCounter.Builder b = EncodedColumnQualifierCounter.newBuilder(); + b.setFamilyName(entry.getKey()); + b.setCounter(entry.getValue()); + builder.addEncodedColumnQualifierCounters(b.build()); + } + } return builder.build(); } @@ -1284,4 +1307,9 @@ public class PTableImpl implements PTable { public StorageScheme getStorageScheme() { return storageScheme; } + + @Override + public Map<String, Integer> getEncodedCQCounters() { + return encodedCQCounters; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 034e7ac..2c94d1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -903,22 +903,28 @@ public class SchemaUtil { /** * Return a map of column family -> next column qualifier number to use. */ - public static Map<String, Integer> getNextEncodedColumnQualifiers(PTable table) { - if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { - Map<String, Integer> map = Maps.newHashMapWithExpectedSize(table.getColumns().size()); - int max = 0; - for (PColumnFamily f : table.getColumnFamilies()) { - for (PColumn column : f.getColumns()) { - if (column.getEncodedColumnQualifier() > max) { - max = column.getEncodedColumnQualifier(); - } - } - // column qualifiers start with 1. - map.put(f.getName().getString(), max + 1); - } - return map; - } - return null; - } +// public static Map<String, Integer> getNextEncodedColumnQualifiers(PTable table) { +// if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { +// Map<String, Integer> map = Maps.newHashMapWithExpectedSize(table.getColumns().size()); +// int max = 0; +// for (PColumnFamily f : table.getColumnFamilies()) { +// for (PColumn column : f.getColumns()) { +// if (column.getEncodedColumnQualifier() > max) { +// max = column.getEncodedColumnQualifier(); +// } +// } +// // column qualifiers start with 1. +// map.put(f.getName().getString(), max + 1); +// } +// // When a table has only primary key columns table.getColumnFamilies() will be empty. +// // In that case, populate the map with the next column qualifier (1) for the default column +// // family of the table. +// if (map.isEmpty()) { +// map.put(SchemaUtil.getEmptyColumnFamilyAsString(table), 1); +// } +// return map; +// } +// return null; +// } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java index 6b4cd9b..5a9c713 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@ -233,7 +233,7 @@ public class CorrelatePlanTest { PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable>emptyList(), false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, true, false, 0, 0L, null); + null, null, true, false, 0, 0L, null, null); TableRef sourceTable = new TableRef(pTable); List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); for (PColumn column : sourceTable.getTable().getColumns()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7a425df/phoenix-protocol/src/main/PTable.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto index ae34695..21a36dd 100644 --- a/phoenix-protocol/src/main/PTable.proto +++ b/phoenix-protocol/src/main/PTable.proto @@ -92,4 +92,10 @@ message PTable { optional int64 updateCacheFrequency = 28; optional int64 indexDisableTimestamp = 29; optional bytes storageScheme = 30; + repeated EncodedColumnQualifierCounter encodedColumnQualifierCounters = 31; } + +message EncodedColumnQualifierCounter { + required string familyName = 1; + required int32 counter = 2; +} \ No newline at end of file
