Repository: phoenix
Updated Branches:
refs/heads/3.0 3f254dc4d -> b95498c55
PHOENIX-1365 Make sequence salt buckets configurable
Conflicts:
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b95498c5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b95498c5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b95498c5
Branch: refs/heads/3.0
Commit: b95498c55c21160b0ed5c60650f9cfce2510ac96
Parents: 3f254dc
Author: James Taylor <[email protected]>
Authored: Fri Oct 17 12:21:44 2014 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Oct 17 12:40:00 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/index/ViewIndexIT.java | 2 +-
.../apache/phoenix/compile/SequenceManager.java | 3 +-
.../coprocessor/MetaDataEndpointImpl.java | 3 +
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 15 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 1 -
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 23 +--
.../phoenix/query/ConnectionQueryServices.java | 1 +
.../query/ConnectionQueryServicesImpl.java | 37 ++--
.../query/ConnectionlessQueryServicesImpl.java | 14 +-
.../query/DelegateConnectionQueryServices.java | 5 +
.../apache/phoenix/query/QueryConstants.java | 5 +-
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../phoenix/query/QueryServicesOptions.java | 16 +-
.../apache/phoenix/schema/MetaDataClient.java | 3 +-
.../org/apache/phoenix/schema/Sequence.java | 7 +
.../org/apache/phoenix/schema/SequenceKey.java | 8 +-
.../org/apache/phoenix/util/MetaDataUtil.java | 10 +-
.../org/apache/phoenix/util/SchemaUtil.java | 4 +-
.../org/apache/phoenix/util/UpgradeUtil.java | 169 +++++++++++--------
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
.../phoenix/query/QueryServicesTestImpl.java | 12 +-
21 files changed, 214 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
index 2f9f9b6..29eb325 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
@@ -82,7 +82,7 @@ public class ViewIndexIT extends BaseIndexIT {
ResultSet rs = conn2.createStatement().executeQuery("SELECT "
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_NAME
- + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+ + " FROM " +
PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED);
StringBuilder buf = new StringBuilder();
while (rs.next()) {
String schemaName = rs.getString(1);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 03091c4..9ea4245 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -122,7 +122,8 @@ public class SequenceManager {
PName tenantName = statement.getConnection().getTenantId();
String tenantId = tenantName == null ? null : tenantName.getString();
TableName tableName = node.getTableName();
- SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(),
tableName.getTableName());
+ int nSaltBuckets =
statement.getConnection().getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(),
tableName.getTableName(), nSaltBuckets);
SequenceValueExpression expression = sequenceMap.get(key);
if (expression == null) {
int index = sequenceMap.size();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index ee8833b..3277f94 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -410,6 +410,9 @@ public class MetaDataEndpointImpl extends
BaseEndpointCoprocessor implements Met
PName pkName = pkNameKv != null ? newPName(pkNameKv.getBuffer(),
pkNameKv.getValueOffset(), pkNameKv.getValueLength()) : null;
KeyValue saltBucketNumKv = tableKeyValues[SALT_BUCKETS_INDEX];
Integer saltBucketNum = saltBucketNumKv != null ?
(Integer)PDataType.INTEGER.getCodec().decodeInt(saltBucketNumKv.getBuffer(),
saltBucketNumKv.getValueOffset(), SortOrder.getDefault()) : null;
+ if (saltBucketNum != null && saltBucketNum.intValue() == 0) {
+ saltBucketNum = null; // Zero salt buckets means not salted
+ }
KeyValue dataTableNameKv = tableKeyValues[DATA_TABLE_NAME_INDEX];
PName dataTableName = dataTableNameKv != null ?
newPName(dataTableNameKv.getBuffer(), dataTableNameKv.getValueOffset(),
dataTableNameKv.getValueLength()) : null;
KeyValue indexStateKv = tableKeyValues[INDEX_STATE_INDEX];
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 84392c0..f382b35 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -99,12 +99,12 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData, org.apache.pho
public static final int TENANT_ID_INDEX = 0;
public static final String SYSTEM_CATALOG_SCHEMA =
QueryConstants.SYSTEM_SCHEMA_NAME;
+ public static final byte[] SYSTEM_CATALOG_SCHEMA_BYTES =
QueryConstants.SYSTEM_SCHEMA_NAME_BYTES;
public static final String SYSTEM_CATALOG_TABLE = "CATALOG";
+ public static final byte[] SYSTEM_CATALOG_TABLE_BYTES =
Bytes.toBytes(SYSTEM_CATALOG_TABLE);
public static final String SYSTEM_CATALOG = SYSTEM_CATALOG_SCHEMA + ".\""
+ SYSTEM_CATALOG_TABLE + "\"";
- public static final byte[] SYSTEM_CATALOG_SCHEMA_BYTES =
Bytes.toBytes(SYSTEM_CATALOG_TABLE);
- public static final byte[] SYSTEM_CATALOG_TABLE_BYTES =
Bytes.toBytes(SYSTEM_CATALOG_SCHEMA);
public static final String SYSTEM_CATALOG_NAME =
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE);
- public static final byte[] SYSTEM_CATALOG_NAME_BYTES =
SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES,
SYSTEM_CATALOG_SCHEMA_BYTES);
+ public static final byte[] SYSTEM_CATALOG_NAME_BYTES =
Bytes.toBytes(SYSTEM_CATALOG_NAME);
public static final String SYSTEM_STATS_TABLE = "STATS";
public static final String SYSTEM_STATS_NAME =
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_STATS_TABLE);
public static final byte[] SYSTEM_STATS_NAME_BYTES =
Bytes.toBytes(SYSTEM_STATS_NAME);
@@ -197,8 +197,13 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData, org.apache.pho
public static final String TYPE_SEQUENCE = "SEQUENCE";
public static final byte[] SEQUENCE_FAMILY_BYTES =
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
- public static final String SEQUENCE_TABLE_NAME = SYSTEM_CATALOG_SCHEMA +
".\"" + TYPE_SEQUENCE + "\"";
- public static final byte[] SEQUENCE_TABLE_NAME_BYTES =
SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, TYPE_SEQUENCE);
+ public static final String SEQUENCE_SCHEMA_NAME = SYSTEM_CATALOG_SCHEMA;
+ public static final byte[] SEQUENCE_SCHEMA_NAME_BYTES =
Bytes.toBytes(SEQUENCE_SCHEMA_NAME);
+ public static final String SEQUENCE_TABLE_NAME = TYPE_SEQUENCE;
+ public static final byte[] SEQUENCE_TABLE_NAME_BYTES =
Bytes.toBytes(SEQUENCE_TABLE_NAME);
+ public static final String SEQUENCE_FULLNAME_ESCAPED =
SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"";
+ public static final String SEQUENCE_FULLNAME =
SchemaUtil.getTableName(SEQUENCE_SCHEMA_NAME, SEQUENCE_TABLE_NAME);
+ public static final byte[] SEQUENCE_FULLNAME_BYTES =
Bytes.toBytes(SEQUENCE_FULLNAME);
public static final String SEQUENCE_SCHEMA = "SEQUENCE_SCHEMA";
public static final String SEQUENCE_NAME = "SEQUENCE_NAME";
public static final String CURRENT_VALUE = "CURRENT_VALUE";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 04646a7..127cd2c 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 21b124c..1da68e7 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -17,14 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.StringReader;
import java.sql.DriverManager;
@@ -59,13 +51,14 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.StringReader;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
/**
* MapReduce mapper that converts CSV input lines into KeyValues that can be
written to HFiles.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 3ae0e81..0672c37 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -110,4 +110,5 @@ public interface ConnectionQueryServices extends
QueryServices, MetaDataMutated
public void addTableStats(String physicalName, PTableStats tableStats);
public void clearCache() throws SQLException;
+ public int getSequenceSaltBuckets();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 2154f8d..f1d12fc 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -150,7 +150,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
private HConnection connection;
private volatile boolean initialized;
-
+ private volatile int nSequenceSaltBuckets;
+
// writes guarded by "this"
private volatile boolean closed;
@@ -1333,21 +1334,29 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// A TableAlreadyExistsException is not
thrown, since the table only exists *after* this fixed timestamp.
} catch (TableAlreadyExistsException ignore) {
}
+ int nSaltBuckets =
ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+
QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
try {
-
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
- } catch (NewerTableAlreadyExistsException ignore) {
+ String createSequenceTable =
Sequence.getCreateTableStatement(nSaltBuckets);
+
metaConnection.createStatement().executeUpdate(createSequenceTable);
+ nSequenceSaltBuckets = nSaltBuckets;
+ } catch (NewerTableAlreadyExistsException e) {
// Ignore, as this will happen if the
SYSTEM.SEQUENCE already exists at this fixed timestamp.
// A TableAlreadyExistsException is not
thrown, since the table only exists *after* this fixed timestamp.
- } catch (TableAlreadyExistsException ignore) {
+ PTable sequenceTable =
ConnectionQueryServicesImpl.this.latestMetaData.getTable(new PTableKey(null,
PhoenixDatabaseMetaData.SEQUENCE_FULLNAME));
+ Integer sequenceSaltBuckets =
sequenceTable.getBucketNum();
+ nSequenceSaltBuckets = sequenceSaltBuckets ==
null ? 0 : sequenceSaltBuckets;
+ } catch (TableAlreadyExistsException e) {
// This will occur if we have an older
SYSTEM.SEQUENCE, so we need to update it to include
// any new columns we've added.
- if
(UpgradeUtil.addSaltByteToSequenceTable(metaConnection)) {
+ if
(UpgradeUtil.addSaltByteToSequenceTable(metaConnection, nSaltBuckets)) {
metaConnection.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.TYPE_SEQUENCE,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP);
clearCache();
}
+ nSequenceSaltBuckets = nSaltBuckets;
}
try {
metaConnection.createStatement().executeUpdate(
@@ -1492,7 +1501,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
public long createSequence(String tenantId, String schemaName, String
sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue,
long maxValue,
boolean cycle, long timestamp) throws SQLException {
- SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName,
sequenceName);
+ SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName,
sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
@@ -1503,7 +1512,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// Now that we have the lock we need, create the sequence
Append append = sequence.createSequence(startWith, incrementBy,
cacheSize, timestamp, minValue, maxValue, cycle);
HTableInterface htable =
-
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
Result result = htable.append(append);
return sequence.createSequence(result, minValue, maxValue,
cycle);
@@ -1519,7 +1528,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
@Override
public long dropSequence(String tenantId, String schemaName, String
sequenceName, long timestamp) throws SQLException {
- SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName,
sequenceName);
+ SequenceKey sequenceKey = new SequenceKey(tenantId, schemaName,
sequenceName, nSequenceSaltBuckets);
Sequence newSequences = new Sequence(sequenceKey);
Sequence sequence = sequenceMap.putIfAbsent(sequenceKey, newSequences);
if (sequence == null) {
@@ -1529,7 +1538,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
sequence.getLock().lock();
// Now that we have the lock we need, create the sequence
Append append = sequence.dropSequence(timestamp);
- HTableInterface htable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface htable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
Result result = htable.append(append);
return sequence.dropSequence(result);
@@ -1627,7 +1636,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
if (toIncrementList.isEmpty()) {
return;
}
- HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
Object[] resultObjects = null;
SQLException sqlE = null;
try {
@@ -1698,7 +1707,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
if (toReturnList.isEmpty()) {
return;
}
- HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
Object[] resultObjects = null;
SQLException sqlE = null;
try {
@@ -1748,7 +1757,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
if (mutations.isEmpty()) {
return;
}
- HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
+ HTableInterface hTable =
this.getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
SQLException sqlE = null;
try {
hTable.batch(mutations);
@@ -1881,4 +1890,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
public void addTableStats(String physicalName, PTableStats tableStats) {
tableStatsCache.put(physicalName, tableStats);
}
+ @Override
+ public int getSequenceSaltBuckets() {
+ return nSequenceSaltBuckets;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 3cd7917..841f93b 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -218,7 +218,9 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
// A TableAlreadyExistsException is not thrown, since the
table only exists *after* this fixed timestamp.
}
try {
-
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_SEQUENCE_METADATA);
+ int nSaltBuckets = getSequenceSaltBuckets();
+ String createTableStatement =
Sequence.getCreateTableStatement(nSaltBuckets);
+
metaConnection.createStatement().executeUpdate(createTableStatement);
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.SEQUENCE
already exists at this fixed timestamp.
// A TableAlreadyExistsException is not thrown, since the
table only exists *after* this fixed timestamp.
@@ -310,7 +312,7 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
public long createSequence(String tenantId, String schemaName, String
sequenceName,
long startWith, long incrementBy, long cacheSize, long minValue,
long maxValue,
boolean cycle, long timestamp) throws SQLException {
- SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName,
getSequenceSaltBuckets());
if (sequenceMap.get(key) != null) {
throw new SequenceAlreadyExistsException(schemaName, sequenceName);
}
@@ -320,7 +322,7 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
@Override
public long dropSequence(String tenantId, String schemaName, String
sequenceName, long timestamp) throws SQLException {
- SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName);
+ SequenceKey key = new SequenceKey(tenantId, schemaName, sequenceName,
getSequenceSaltBuckets());
if (sequenceMap.remove(key) == null) {
throw new SequenceNotFoundException(schemaName, sequenceName);
}
@@ -429,4 +431,10 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
@Override
public void clearCache() throws SQLException {
}
+
+ @Override
+ public int getSequenceSaltBuckets() {
+ return getProps().getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,
+ QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 2bcacc6..34bca4d 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -248,4 +248,9 @@ public class DelegateConnectionQueryServices extends
DelegateQueryServices imple
public void clearCache() throws SQLException {
getDelegate().clearCache();
}
+
+ @Override
+ public int getSequenceSaltBuckets() {
+ return getDelegate().getSequenceSaltBuckets();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index d2d25fe..bc90c86 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -99,7 +99,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.util.ByteUtil;
@@ -267,7 +266,5 @@ public interface QueryConstants {
CYCLE_FLAG + " BOOLEAN, \n" +
LIMIT_REACHED_FLAG + " BOOLEAN \n" +
" CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" +
- HConstants.VERSIONS + "=" +
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
- "SALT_BUCKETS=" + SaltingUtil.MAX_BUCKET_NUM + "\n";
-
+ HConstants.VERSIONS + "=" +
MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + "\n";
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ecf4818..a8536a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -125,6 +125,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB =
"phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB =
"phoenix.stats.guidepost.per.region";
+ public static final String SEQUENCE_SALT_BUCKETS_ATTRIB =
"phoenix.sequence.saltBuckets";
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 5fdf165..a208ac5 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -47,6 +47,7 @@ import static
org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
import static
org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
import static
org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
@@ -60,6 +61,7 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.WALEditCodec;
+import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -131,8 +133,14 @@ public class QueryServicesOptions {
public static final long DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE = 1024 * 1024
* 30;
public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min
public static final int DEFAULT_GUIDE_POSTS_PER_REGION = 20;
- public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS =
DEFAULT_STATS_UPDATE_FREQ_MS/2;
+ public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
+
+ /**
+ * Use only first time SYSTEM.SEQUENCE table is created.
+ */
+ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS =
SaltingUtil.MAX_BUCKET_NUM;
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
@@ -427,4 +435,10 @@ public class QueryServicesOptions {
public QueryServicesOptions setMinStatsUpdateFrequencyMs(int frequencyMs) {
return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs);
}
+
+ public QueryServicesOptions setSequenceSaltBuckets(int saltBuckets) {
+ config.setInt(SEQUENCE_SALT_BUCKETS_ATTRIB, saltBuckets);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9d50a07..e950903 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -791,7 +791,8 @@ public class MetaDataClient {
PName tenantId = connection.getTenantId();
String tenantIdStr = tenantId == null ? null :
connection.getTenantId().getString();
PName physicalName = dataTable.getPhysicalName();
- SequenceKey key =
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName);
+ int nSequenceSaltBuckets =
connection.getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key =
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
nSequenceSaltBuckets);
// Create at parent timestamp as we know that will be
earlier than now
// and earlier than any SCN if one is set.
createSequence(key.getTenantId(), key.getSchemaName(),
key.getSequenceName(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index dc3d2b1..b7f0f3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -538,4 +538,11 @@ public class Sequence {
.setTableName(key.getSequenceName())
.build().buildException();
}
+
+ public static String getCreateTableStatement(int nSaltBuckets) {
+ if (nSaltBuckets <= 0) {
+ return QueryConstants.CREATE_SEQUENCE_METADATA;
+ }
+ return QueryConstants.CREATE_SEQUENCE_METADATA + "," +
PhoenixDatabaseMetaData.SALT_BUCKETS + "=" + nSaltBuckets;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
index c25e438..94ca549 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceKey.java
@@ -28,12 +28,14 @@ public class SequenceKey implements Comparable<SequenceKey>
{
private final String sequenceName;
private final byte[] key;
- public SequenceKey(String tenantId, String schemaName, String
sequenceName) {
+ public SequenceKey(String tenantId, String schemaName, String
sequenceName, int nBuckets) {
this.tenantId = tenantId;
this.schemaName = schemaName;
this.sequenceName = sequenceName;
- this.key = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY,
tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId),
QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ?
ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName),
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
- key[0] = SaltingUtil.getSaltingByte(key,
SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES,
SaltingUtil.MAX_BUCKET_NUM);
+ this.key = ByteUtil.concat(nBuckets <= 0 ? ByteUtil.EMPTY_BYTE_ARRAY :
QueryConstants.SEPARATOR_BYTE_ARRAY, tenantId == null ?
ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(tenantId),
QueryConstants.SEPARATOR_BYTE_ARRAY, schemaName == null ?
ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName),
QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(sequenceName));
+ if (nBuckets > 0) {
+ key[0] = SaltingUtil.getSaltingByte(key,
SaltingUtil.NUM_SALTING_BYTES, key.length - SaltingUtil.NUM_SALTING_BYTES,
SaltingUtil.MAX_BUCKET_NUM);
+ }
}
public byte[] getKey() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index dfeeddb..b7d6d98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -259,14 +259,15 @@ public class MetaDataUtil {
return schemaName;
}
- public static SequenceKey getViewIndexSequenceKey(String tenantId, PName
physicalName) {
+
+ public static SequenceKey getViewIndexSequenceKey(String tenantId, PName
physicalName, int nSaltBuckets) {
// Create global sequence of the form: <prefixed base table
name><tenant id>
// rather than tenant-specific sequence, as it makes it much easier
// to cleanup when the physical table is dropped, as we can delete
// all global sequences leading with <prefix> + physical name.
String schemaName = VIEW_INDEX_SEQUENCE_PREFIX +
physicalName.getString();
String tableName = tenantId == null ? "" : tenantId;
- return new SequenceKey(null, schemaName, tableName);
+ return new SequenceKey(null, schemaName, tableName, nSaltBuckets);
}
public static PDataType getViewIndexIdDataType() {
@@ -296,8 +297,9 @@ public class MetaDataUtil {
}
public static void deleteViewIndexSequences(PhoenixConnection connection,
PName name) throws SQLException {
- SequenceKey key = getViewIndexSequenceKey(null, name);
- connection.createStatement().executeUpdate("DELETE FROM " +
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME +
+ int nSequenceSaltBuckets =
connection.getQueryServices().getSequenceSaltBuckets();
+ SequenceKey key = getViewIndexSequenceKey(null, name,
nSequenceSaltBuckets);
+ connection.createStatement().executeUpdate("DELETE FROM " +
PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED +
" WHERE " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL AND
" +
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" +
key.getSchemaName() + "'");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index b5837de..4fc78df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -344,7 +344,7 @@ public class SchemaUtil {
}
public static boolean isSequenceTable(byte[] tableName) {
- return Bytes.compareTo(tableName,
PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0;
+ return Bytes.compareTo(tableName,
PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES) == 0;
}
public static boolean isMetaTable(PTable table) {
@@ -352,7 +352,7 @@ public class SchemaUtil {
}
public static boolean isMetaTable(byte[] schemaName, byte[] tableName) {
- return Bytes.compareTo(schemaName,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES) == 0 &&
Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES)
== 0;
+ return Bytes.compareTo(schemaName,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA_BYTES) == 0 &&
Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE_BYTES)
== 0;
}
public static boolean isMetaTable(String schemaName, String tableName) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 4c8a369..3054200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -45,105 +45,134 @@ public class UpgradeUtil {
private UpgradeUtil() {
}
- public static boolean addSaltByteToSequenceTable(PhoenixConnection conn)
throws SQLException {
+ public static boolean addSaltByteToSequenceTable(PhoenixConnection conn,
int nSaltBuckets) throws SQLException {
+ if (nSaltBuckets <= 0) {
+ logger.info("Not upgrading SYSTEM.SEQUENCE table because
SALT_BUCKETS is zero");
+ return false;
+ }
logger.info("Upgrading SYSTEM.SEQUENCE table");
+ byte[] seqTableKey = SchemaUtil.getTableKey(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.TYPE_SEQUENCE);
HTableInterface sysTable =
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
- byte[] seqTableKey = SchemaUtil.getTableKey(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.TYPE_SEQUENCE);
logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to "
+ SaltingUtil.MAX_BUCKET_NUM);
KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
- PDataType.INTEGER.toBytes(SaltingUtil.MAX_BUCKET_NUM));
- Put put = new Put(seqTableKey);
- put.add(saltKV);
+ PDataType.INTEGER.toBytes(nSaltBuckets));
+ Put saltPut = new Put(seqTableKey);
+ saltPut.add(saltKV);
// Prevent multiple clients from doing this upgrade
if (!sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, put)) {
+ PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null,
saltPut)) {
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
- } catch (IOException e) {
- throw ServerUtil.parseServerException(e);
- } finally {
- try {
- sysTable.close();
- } catch (IOException e) {
- logger.warn("Exception during close",e);
- }
- }
- int batchSizeBytes = 100 * 1024; // 100K chunks
- int sizeBytes = 0;
- List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
- boolean success = false;
- Scan scan = new Scan();
- scan.setRaw(true);
- scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
- HTableInterface seqTable =
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES);
- try {
- logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
- ResultScanner scanner = seqTable.getScanner(scan);
+ int batchSizeBytes = 100 * 1024; // 100K chunks
+ int sizeBytes = 0;
+ List<Mutation> mutations =
Lists.newArrayListWithExpectedSize(10000);
+
+ boolean success = false;
+ Scan scan = new Scan();
+ scan.setRaw(true);
+
scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+ HTableInterface seqTable =
conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
try {
- Result result;
- while ((result = scanner.next()) != null) {
- for (KeyValue keyValue : result.raw()) {
- KeyValue newKeyValue = addSaltByte(keyValue);
- sizeBytes += newKeyValue.getLength();
- if (KeyValue.Type.codeToType(newKeyValue.getType()) ==
KeyValue.Type.Put) {
- // Delete old value
- byte[] buf = keyValue.getBuffer();
- Delete delete = new Delete(keyValue.getRow());
- KeyValue deleteKeyValue = new KeyValue(buf,
keyValue.getRowOffset(), keyValue.getRowLength(),
- buf, keyValue.getFamilyOffset(),
keyValue.getFamilyLength(),
- buf, keyValue.getQualifierOffset(),
keyValue.getQualifierLength(),
- keyValue.getTimestamp(),
KeyValue.Type.Delete,
- ByteUtil.EMPTY_BYTE_ARRAY,0,0);
- delete.addDeleteMarker(deleteKeyValue);
- mutations.add(delete);
- sizeBytes += deleteKeyValue.getLength();
- // Put new value
- Put put = new Put(newKeyValue.getRow());
- put.add(newKeyValue);
- mutations.add(put);
- } else if
(KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
- // Copy delete marker using new key so that it
continues
- // to delete the key value preceding it that will
be updated
- // as well.
- Delete delete = new Delete(newKeyValue.getRow());
- delete.addDeleteMarker(newKeyValue);
- mutations.add(delete);
+ boolean committed = false;
+ logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
+ ResultScanner scanner = seqTable.getScanner(scan);
+ try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ for (KeyValue keyValue : result.raw()) {
+ KeyValue newKeyValue = addSaltByte(keyValue);
+ sizeBytes += newKeyValue.getLength();
+ if
(KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
+ // Delete old value
+ byte[] buf = keyValue.getBuffer();
+ Delete delete = new Delete(keyValue.getRow());
+ KeyValue deleteKeyValue = new KeyValue(buf,
keyValue.getRowOffset(), keyValue.getRowLength(),
+ buf, keyValue.getFamilyOffset(),
keyValue.getFamilyLength(),
+ buf, keyValue.getQualifierOffset(),
keyValue.getQualifierLength(),
+ keyValue.getTimestamp(),
KeyValue.Type.Delete,
+ ByteUtil.EMPTY_BYTE_ARRAY,0,0);
+ delete.addDeleteMarker(deleteKeyValue);
+ mutations.add(delete);
+ sizeBytes += deleteKeyValue.getLength();
+ // Put new value
+ Put put = new Put(newKeyValue.getRow());
+ put.add(newKeyValue);
+ mutations.add(put);
+ } else if
(KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
+ // Copy delete marker using new key so that it
continues
+ // to delete the key value preceding it that
will be updated
+ // as well.
+ Delete delete = new
Delete(newKeyValue.getRow());
+ delete.addDeleteMarker(newKeyValue);
+ mutations.add(delete);
+ }
+ if (sizeBytes >= batchSizeBytes) {
+ logger.info("Committing bactch of
SYSTEM.SEQUENCE rows");
+ seqTable.batch(mutations);
+ mutations.clear();
+ sizeBytes = 0;
+ committed = true;
+ }
}
- if (sizeBytes >= batchSizeBytes) {
- logger.info("Committing bactch of SYSTEM.SEQUENCE
rows");
- seqTable.batch(mutations);
- mutations.clear();
- sizeBytes = 0;
+ }
+ if (!mutations.isEmpty()) {
+ logger.info("Committing last bactch of SYSTEM.SEQUENCE
rows");
+ seqTable.batch(mutations);
+ }
+ logger.info("Successfully completed upgrade of
SYSTEM.SEQUENCE");
+ success = true;
+ return true;
+ } catch (InterruptedException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ try {
+ scanner.close();
+ } finally {
+ if (!success) {
+ if (!committed) { // Try to recover by setting
salting back to off, as we haven't successfully committed anything
+ // Don't use Delete here as we'd never be able
to change it again at this timestamp.
+ KeyValue unsaltKV =
KeyValueUtil.newKeyValue(seqTableKey,
+
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
+
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
+ PDataType.INTEGER.toBytes(0));
+ Put unsaltPut = new Put(seqTableKey);
+ unsaltPut.add(unsaltKV);
+ try {
+ sysTable.put(unsaltPut);
+ success = true;
+ } finally {
+ if (!success)
logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
+ }
+ } else { // We're screwed b/c we've already
committed some salted sequences...
+ logger.error("SYSTEM.SEQUENCE TABLE LEFT IN
CORRUPT STATE");
+ }
}
}
}
- if (!mutations.isEmpty()) {
- logger.info("Committing last bactch of SYSTEM.SEQUENCE
rows");
- seqTable.batch(mutations);
- }
- logger.info("Successfully completed upgrade of
SYSTEM.SEQUENCE");
- success = true;
- return true;
- } catch (InterruptedException e) {
+ } catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
- if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN
CORRUPT STATE");
- scanner.close();
+ try {
+ seqTable.close();
+ } catch (IOException e) {
+ logger.warn("Exception during close",e);
+ }
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
- seqTable.close();
+ sysTable.close();
} catch (IOException e) {
logger.warn("Exception during close",e);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 1e3f004..cfa8787 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -793,7 +793,7 @@ public abstract class BaseTest {
ResultSet rs = conn.createStatement().executeQuery("SELECT "
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_NAME
- + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME);
+ + " FROM " +
PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED);
while (rs.next()) {
try {
conn.createStatement().execute("DROP SEQUENCE " +
SchemaUtil.getTableName(rs.getString(1), rs.getString(2)));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b95498c5/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 16845e8..a9b4c7d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -32,8 +32,8 @@ import org.apache.phoenix.util.ReadOnlyProps;
*/
public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
- private static final int DEFAULT_THREAD_POOL_SIZE = 16;
- private static final int DEFAULT_QUEUE_SIZE = 5000;
+ private static final int DEFAULT_THREAD_POOL_SIZE = 20;
+ private static final int DEFAULT_QUEUE_SIZE = 0;
// TODO: setting this down to 5mb causes insufficient memory exceptions.
Need to investigate why
private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap
private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000*5; //5min
@@ -51,15 +51,21 @@ public final class QueryServicesTestImpl extends
BaseQueryServicesImpl {
private static final String DEFAULT_WAL_EDIT_CODEC =
IndexedWALEditCodec.class.getName();
public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE =
1024L*1024L*4L; // 4 Mb
public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE =
1024L*1024L*2L; // 2 Mb
-
public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0;
+ /**
+ * Set number of salt buckets lower for sequence table during testing, as
a high
+ * value overwhelms our mini clusters.
+ */
+ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 4;
+
public QueryServicesTestImpl(ReadOnlyProps defaultProps) {
this(defaultProps, ReadOnlyProps.EMPTY_PROPS);
}
private static QueryServicesOptions getDefaultServicesOptions() {
return withDefaults()
+ .setSequenceSaltBuckets(DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS)
.setMinStatsUpdateFrequencyMs(DEFAULT_MIN_STATS_UPDATE_FREQ_MS)
.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
.setQueueSize(DEFAULT_QUEUE_SIZE)