Repository: phoenix
Updated Branches:
refs/heads/3.0 712b4e672 -> 952c8bca8
PHOENIX-1401 SYSTEM.SEQUENCE table is not pre-split as expected
Conflicts:
phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55eb01da
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55eb01da
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55eb01da
Branch: refs/heads/3.0
Commit: 55eb01da813af30674a144f1fa9a5ad22e6bffb8
Parents: 712b4e6
Author: James Taylor <[email protected]>
Authored: Tue Nov 4 10:36:28 2014 -0800
Committer: James Taylor <[email protected]>
Committed: Tue Nov 4 23:01:30 2014 -0800
----------------------------------------------------------------------
.../phoenix/coprocessor/MetaDataProtocol.java | 4 +-
.../query/ConnectionQueryServicesImpl.java | 12 ++--
.../apache/phoenix/schema/MetaDataClient.java | 2 +-
.../schema/TableAlreadyExistsException.java | 16 +++++-
.../org/apache/phoenix/util/UpgradeUtil.java | 58 ++++++++++++++++++--
5 files changed, 80 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55eb01da/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 76fb129..60bd458 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -57,13 +57,13 @@ import com.google.common.collect.Lists;
public interface MetaDataProtocol extends CoprocessorProtocol {
public static final int PHOENIX_MAJOR_VERSION = 3;
public static final int PHOENIX_MINOR_VERSION = 2;
- public static final int PHOENIX_PATCH_NUMBER = 0;
+ public static final int PHOENIX_PATCH_NUMBER = 1;
public static final int PHOENIX_VERSION =
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
public static final long MIN_TABLE_TIMESTAMP = 0;
// Incremented from 3 to 4 to salt the sequence table in 3.2/4.2
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP
+ 4;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP
+ 5;
public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
// TODO: pare this down to minimum, as we don't need duplicates for both
table and column errors, nor should we need
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55eb01da/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f6791f6..c53c19b 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -1353,12 +1353,16 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
} catch (TableAlreadyExistsException e) {
// This will occur if we have an older
SYSTEM.SEQUENCE, so we need to update it to include
// any new columns we've added.
- if
(UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets)) {
+ if
(UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) {
metaConnection.removeTable(null,
-
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
-
PhoenixDatabaseMetaData.TYPE_SEQUENCE,
+
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
+
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
- clearCache();
+
clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
+
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME_BYTES,
+
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES,
+
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
+
clearTableRegionCache(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
}
nSequenceSaltBuckets = nSaltBuckets;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55eb01da/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index a460f9f..9f1ace4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1419,7 +1419,7 @@ public class MetaDataClient {
case TABLE_ALREADY_EXISTS:
connection.addTable(result.getTable());
if (!statement.ifNotExists()) {
- throw new TableAlreadyExistsException(schemaName,
tableName);
+ throw new TableAlreadyExistsException(schemaName,
tableName, result.getTable());
}
return null;
case PARENT_TABLE_NOT_FOUND:
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55eb01da/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
index 466b4a4..2b4eaeb 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableAlreadyExistsException.java
@@ -35,16 +35,26 @@ public class TableAlreadyExistsException extends
SQLException {
private static SQLExceptionCode code =
SQLExceptionCode.TABLE_ALREADY_EXIST;
private final String schemaName;
private final String tableName;
+ private final PTable table;
public TableAlreadyExistsException(String schemaName, String tableName) {
- this(schemaName, tableName, null);
+ this(schemaName, tableName, null, null);
}
public TableAlreadyExistsException(String schemaName, String tableName,
String msg) {
+ this(schemaName, tableName, msg, null);
+ }
+
+ public TableAlreadyExistsException(String schemaName, String tableName,
PTable table) {
+ this(schemaName, tableName, null, table);
+ }
+
+ public TableAlreadyExistsException(String schemaName, String tableName,
String msg, PTable table) {
super(new
SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName).setMessage(msg).build().toString(),
code.getSQLState(), code.getErrorCode());
this.schemaName = schemaName;
this.tableName = tableName;
+ this.table = table;
}
public String getTableName() {
@@ -54,4 +64,8 @@ public class TableAlreadyExistsException extends SQLException
{
public String getSchemaName() {
return schemaName;
}
+
+ public PTable getTable() {
+ return table;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55eb01da/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 5f448cb..1c6ff18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -37,6 +38,7 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SaltingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +52,35 @@ public class UpgradeUtil {
private UpgradeUtil() {
}
- public static boolean upgradeSequenceTable(PhoenixConnection conn, int
nSaltBuckets) throws SQLException {
+ private static void preSplitSequenceTable(PhoenixConnection conn, int
nSaltBuckets) throws SQLException {
+ HBaseAdmin admin = conn.getQueryServices().getAdmin();
+ try {
+ if (nSaltBuckets <= 0) {
+ return;
+ }
+ logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets
+ "-ways");
+ for (int i = 0; i < nSaltBuckets; i++) {
+ logger.info("Pre-splitting SYSTEM.SEQUENCE table for salt
bucket " + i);
+ admin.split(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES,
new byte[] {(byte)i});
+ }
+ logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table");
+ } catch (IOException e) {
+ throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE
table", e);
+ } catch (InterruptedException e) {
+ throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE
table", e);
+ } finally {
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.warn("Exception while closing admin during pre-split",
e);
+ }
+ }
+ }
+
+ public static boolean upgradeSequenceTable(PhoenixConnection conn, int
nSaltBuckets, PTable oldTable) throws SQLException {
logger.info("Upgrading SYSTEM.SEQUENCE table");
- byte[] seqTableKey = SchemaUtil.getTableKey(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.TYPE_SEQUENCE);
+ byte[] seqTableKey = SchemaUtil.getTableKey(null,
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA_NAME,
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
HTableInterface sysTable =
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to "
+ SaltingUtil.MAX_BUCKET_NUM);
@@ -68,7 +95,29 @@ public class UpgradeUtil {
if (!sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null,
saltPut)) {
-
+ if (oldTable == null) { // Unexpected, but to be safe just run
pre-split code
+ preSplitSequenceTable(conn, nSaltBuckets);
+ return true;
+ }
+ // We can detect upgrade from 4.2.0 -> 4.2.1 based on the
timestamp of the table row
+ if (oldTable.getTimeStamp() ==
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP-1) {
+ byte[] oldSeqNum =
PDataType.LONG.toBytes(oldTable.getSequenceNumber());
+ KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES,
+ MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+
PDataType.LONG.toBytes(oldTable.getSequenceNumber()+1));
+ Put seqNumPut = new Put(seqTableKey);
+ seqNumPut.add(seqNumKV);
+ // Increment TABLE_SEQ_NUM in checkAndPut as semaphore so
that only single client
+ // pre-splits the sequence table.
+ if (sysTable.checkAndPut(seqTableKey,
+ PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES,
oldSeqNum, seqNumPut)) {
+ preSplitSequenceTable(conn, nSaltBuckets);
+ return true;
+ }
+ }
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
@@ -84,7 +133,7 @@ public class UpgradeUtil {
HTableInterface seqTable =
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
boolean committed = false;
- logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
ResultScanner scanner = seqTable.getScanner(scan);
try {
Result result;
@@ -131,6 +180,7 @@ public class UpgradeUtil {
logger.info("Committing last bactch of SYSTEM.SEQUENCE
rows");
seqTable.batch(mutations);
}
+ preSplitSequenceTable(conn, nSaltBuckets);
logger.info("Successfully completed upgrade of
SYSTEM.SEQUENCE");
success = true;
return true;