PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split as expected
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cb60cb4a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cb60cb4a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cb60cb4a Branch: refs/heads/4.0 Commit: cb60cb4a3b0181df841fef10c0365e197d96b59d Parents: 37c4d3e Author: James Taylor <[email protected]> Authored: Tue Nov 4 10:36:28 2014 -0800 Committer: James Taylor <[email protected]> Committed: Tue Nov 4 22:49:07 2014 -0800 ---------------------------------------------------------------------- .../phoenix/coprocessor/MetaDataProtocol.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 12 ++-- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../schema/TableAlreadyExistsException.java | 16 +++++- .../org/apache/phoenix/util/UpgradeUtil.java | 58 ++++++++++++++++++-- 5 files changed, 80 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 662bed3..f8349be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -53,14 +53,14 @@ import com.google.protobuf.HBaseZeroCopyByteString; public abstract class MetaDataProtocol extends MetaDataService { public static final int PHOENIX_MAJOR_VERSION = 4; public static final int PHOENIX_MINOR_VERSION = 2; - public static final int PHOENIX_PATCH_NUMBER = 0; + public static final int PHOENIX_PATCH_NUMBER = 1; public static final int PHOENIX_VERSION = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER); public static final long MIN_TABLE_TIMESTAMP = 0; // Incremented from 3 to 4 to salt the sequence table in 3.2/4.2 - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 4; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 5; public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index aeb9ac2..eab263b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -1539,12 +1539,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (TableAlreadyExistsException e) { // This will occur if we have an older SYSTEM.SEQUENCE, so we need to update it to include // any new columns we've added. - if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets)) { + if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { metaConnection.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, - PhoenixDatabaseMetaData.TYPE_SEQUENCE, + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); - clearCache(); + clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES, + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); } nSequenceSaltBuckets = nSaltBuckets; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 5892d14..e5951fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1532,7 +1532,7 @@ public class MetaDataClient { case TABLE_ALREADY_EXISTS: connection.addTable(result.getTable()); if (!statement.ifNotExists()) { - throw new TableAlreadyExistsException(schemaName, tableName); + throw new TableAlreadyExistsException(schemaName, tableName, result.getTable()); } return null; case PARENT_TABLE_NOT_FOUND: http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java index 466b4a4..2b4eaeb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java @@ -35,16 +35,26 @@ public class TableAlreadyExistsException extends SQLException { private static SQLExceptionCode code = SQLExceptionCode.TABLE_ALREADY_EXIST; private final String schemaName; private final String tableName; + private final PTable table; public TableAlreadyExistsException(String schemaName, String tableName) { - this(schemaName, tableName, null); + this(schemaName, tableName, null, null); } public TableAlreadyExistsException(String schemaName, String tableName, String msg) { + this(schemaName, tableName, msg, null); + } + + public TableAlreadyExistsException(String schemaName, String tableName, PTable table) { + this(schemaName, tableName, null, table); + } + + public TableAlreadyExistsException(String schemaName, String tableName, String msg, PTable table) { super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).setMessage(msg).build().toString(), code.getSQLState(), code.getErrorCode()); this.schemaName = schemaName; this.tableName = tableName; + this.table = table; } public String getTableName() { @@ -54,4 +64,8 @@ public class TableAlreadyExistsException extends SQLException { public String getSchemaName() { return schemaName; } + + public PTable getTable() { + return table; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cb60cb4a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index b51b455..21e0631 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -37,6 +38,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SaltingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,11 +52,36 @@ public class UpgradeUtil { private UpgradeUtil() { } + private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException { + HBaseAdmin admin = conn.getQueryServices().getAdmin(); + try { + if (nSaltBuckets <= 0) { + return; + } + logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways"); + for (int i = 0; i < nSaltBuckets; i++) { + logger.info("Pre-splitting SYSTEM.SEQUENCE table for salt bucket " + i); + admin.split(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES, new byte[] {(byte)i}); + } + logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table"); + } catch (IOException e) { + throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e); + } catch (InterruptedException e) { + throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e); + } finally { + try { + admin.close(); + } catch (IOException e) { + logger.warn("Exception while closing admin during pre-split", e); + } + } + } + @SuppressWarnings("deprecation") - public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException { + public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException { logger.info("Upgrading SYSTEM.SEQUENCE table"); - byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE); + byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME); HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); try { logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM); @@ -69,7 +96,29 @@ public class UpgradeUtil { if (!sysTable.checkAndPut(seqTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, saltPut)) { - + if (oldTable == null) { // Unexpected, but to be safe just run pre-split code + preSplitSequenceTable(conn, nSaltBuckets); + return true; + } + // We can detect upgrade from 4.2.0 -> 4.2.1 based on the timestamp of the table row + if (oldTable.getTimeStamp() == MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP-1) { + byte[] oldSeqNum = PDataType.LONG.toBytes(oldTable.getSequenceNumber()); + KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey, + PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, + PDataType.LONG.toBytes(oldTable.getSequenceNumber()+1)); + Put seqNumPut = new Put(seqTableKey); + seqNumPut.add(seqNumKV); + // Increment TABLE_SEQ_NUM in checkAndPut as semaphore so that only single client + // pre-splits the sequence table. + if (sysTable.checkAndPut(seqTableKey, + PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, oldSeqNum, seqNumPut)) { + preSplitSequenceTable(conn, nSaltBuckets); + return true; + } + } logger.info("SYSTEM.SEQUENCE table has already been upgraded"); return false; } @@ -85,7 +134,7 @@ public class UpgradeUtil { HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); try { boolean committed = false; - logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows"); + logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows"); ResultScanner scanner = seqTable.getScanner(scan); try { Result result; @@ -132,6 +181,7 @@ public class UpgradeUtil { logger.info("Committing last bactch of SYSTEM.SEQUENCE rows"); seqTable.batch(mutations); } + preSplitSequenceTable(conn, nSaltBuckets); logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE"); success = true; return true;
