This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new 01405c3 PHOENIX-5138 - ViewIndexId sequences created after
PHOENIX-5132 shouldn't collide with ones created before it
01405c3 is described below
commit 01405c3b112fc5afcfed4aff35bc551d90bd4feb
Author: Geoffrey Jacoby <[email protected]>
AuthorDate: Tue Apr 9 13:35:21 2019 -0700
PHOENIX-5138 - ViewIndexId sequences created after PHOENIX-5132 shouldn't
collide with ones created before it
---
.../java/org/apache/phoenix/end2end/UpgradeIT.java | 118 +++++++++++++++++----
.../apache/phoenix/end2end/index/ViewIndexIT.java | 10 +-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 48 +++++----
.../phoenix/query/ConnectionQueryServicesImpl.java | 6 ++
.../java/org/apache/phoenix/util/MetaDataUtil.java | 30 +++++-
.../java/org/apache/phoenix/util/UpgradeUtil.java | 112 ++++++++++++++++++-
6 files changed, 272 insertions(+), 52 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
index a27d8dc..88f1062 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
@@ -21,6 +21,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -30,6 +31,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -38,6 +40,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -59,6 +62,9 @@ import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SequenceAllocation;
+import org.apache.phoenix.schema.SequenceKey;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -178,25 +184,10 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
}
PName tenantId = phxConn.getTenantId();
PName physicalName = PNameFactory.newName(hbaseTableName);
- String oldSchemaName =
MetaDataUtil.getViewIndexSequenceSchemaName(PNameFactory.newName(phoenixFullTableName),
- false);
String newSchemaName =
MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
String newSequenceName =
MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
- ResultSet rs = phxConn.createStatement()
- .executeQuery("SELECT " +
PhoenixDatabaseMetaData.CURRENT_VALUE + " FROM "
- + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + "
WHERE " + PhoenixDatabaseMetaData.TENANT_ID
- + " IS NULL AND " +
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + newSchemaName
- + "' AND " + PhoenixDatabaseMetaData.SEQUENCE_NAME
+ "='" + newSequenceName + "'");
- assertTrue(rs.next());
- assertEquals("-9223372036854775805", rs.getString(1));
- rs = phxConn.createStatement().executeQuery("SELECT " +
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
- + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + "," +
PhoenixDatabaseMetaData.CURRENT_VALUE + " FROM "
- + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " +
PhoenixDatabaseMetaData.TENANT_ID
- + " IS NULL AND " +
PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + oldSchemaName + "'");
- assertFalse(rs.next());
- phxConn.close();
+ verifySequenceValue(null, newSequenceName, newSchemaName,
-9223372036854775805L);
admin.close();
-
}
}
@@ -507,12 +498,20 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
return DriverManager.getConnection(getUrl(), props);
}
- private Connection getConnection(boolean tenantSpecific, String tenantId)
throws SQLException {
+ private Connection getConnection(boolean tenantSpecific, String tenantId,
boolean isNamespaceMappingEnabled)
+ throws SQLException {
if (tenantSpecific) {
checkNotNull(tenantId);
return createTenantConnection(tenantId);
}
- return DriverManager.getConnection(getUrl());
+ Properties props = new Properties();
+ if (isNamespaceMappingEnabled){
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,
"true");
+ }
+ return DriverManager.getConnection(getUrl(), props);
+ }
+ private Connection getConnection(boolean tenantSpecific, String tenantId)
throws SQLException {
+ return getConnection(tenantSpecific, tenantId, false);
}
@Test
@@ -588,4 +587,87 @@ public class UpgradeIT extends ParallelStatsDisabledIT {
return childLinkSet;
}
+ @Test
+ public void testMergeViewIndexSequences() throws Exception {
+ testMergeViewIndexSequencesHelper(false);
+ }
+
+ @Test
+ public void testMergeViewIndexSequencesWithNamespaces() throws Exception {
+ testMergeViewIndexSequencesHelper(true);
+ }
+
+ private void testMergeViewIndexSequencesHelper(boolean
isNamespaceMappingEnabled) throws Exception {
+ PhoenixConnection conn = getConnection(false, null,
isNamespaceMappingEnabled).unwrap(PhoenixConnection.class);
+ ConnectionQueryServices cqs = conn.getQueryServices();
+ //First delete any sequences that may exist from previous tests
+ conn.createStatement().execute("DELETE FROM " +
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE);
+ conn.commit();
+ cqs.clearCache();
+ //Now make sure that running the merge logic doesn't cause a problem
when there are no
+ //sequences
+ UpgradeUtil.mergeViewIndexIdSequences(cqs, conn);
+ PName tenantOne = PNameFactory.newName("TENANT_ONE");
+ PName tenantTwo = PNameFactory.newName("TENANT_TWO");
+ String tableName =
+ SchemaUtil.getPhysicalHBaseTableName("TEST",
+ "T_" + generateUniqueName(),
isNamespaceMappingEnabled).getString();
+ PName viewIndexTable =
PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(tableName));
+ SequenceKey sequenceOne =
+ createViewIndexSequenceWithOldName(cqs, tenantOne, viewIndexTable,
isNamespaceMappingEnabled);
+ SequenceKey sequenceTwo =
+ createViewIndexSequenceWithOldName(cqs, tenantTwo, viewIndexTable,
isNamespaceMappingEnabled);
+ SequenceKey sequenceGlobal =
+ createViewIndexSequenceWithOldName(cqs, null, viewIndexTable,
isNamespaceMappingEnabled);
+
+ List<SequenceAllocation> allocations = Lists.newArrayList();
+ long val1 = 10;
+ long val2 = 100;
+ long val3 = 1000;
+ allocations.add(new SequenceAllocation(sequenceOne, val1));
+ allocations.add(new SequenceAllocation(sequenceGlobal, val2));
+ allocations.add(new SequenceAllocation(sequenceTwo, val3));
+
+
+ long[] incrementedValues = new long[3];
+ SQLException[] exceptions = new SQLException[3];
+ //simulate incrementing the view indexes
+ cqs.incrementSequences(allocations,
EnvironmentEdgeManager.currentTimeMillis(), incrementedValues,
+ exceptions);
+ for (SQLException e : exceptions) {
+ assertNull(e);
+ }
+
+ UpgradeUtil.mergeViewIndexIdSequences(cqs, conn);
+ //now check that there exists a sequence using the new naming
convention, whose value is the
+ //max of all the previous sequences for this table.
+
+ List<SequenceAllocation> afterUpgradeAllocations =
Lists.newArrayList();
+ SequenceKey sequenceUpgrade =
MetaDataUtil.getViewIndexSequenceKey(null, viewIndexTable, 0,
isNamespaceMappingEnabled);
+ afterUpgradeAllocations.add(new SequenceAllocation(sequenceUpgrade,
1));
+ long[] afterUpgradeValues = new long[1];
+ SQLException[] afterUpgradeExceptions = new SQLException[1];
+ cqs.incrementSequences(afterUpgradeAllocations,
EnvironmentEdgeManager.currentTimeMillis(), afterUpgradeValues,
afterUpgradeExceptions);
+
+ assertNull(afterUpgradeExceptions[0]);
+ int safetyIncrement = 100;
+ if (isNamespaceMappingEnabled){
+ //since one sequence (the global one) will be reused as the "new"
sequence,
+ // it's already in cache and will reflect the final increment
immediately
+ assertEquals(Long.MIN_VALUE + val3 + safetyIncrement + 1,
afterUpgradeValues[0]);
+ } else {
+ assertEquals(Long.MIN_VALUE + val3 + safetyIncrement,
afterUpgradeValues[0]);
+ }
+ }
+
+ private SequenceKey
createViewIndexSequenceWithOldName(ConnectionQueryServices cqs, PName tenant,
PName viewIndexTable, boolean isNamespaceMapped) throws SQLException {
+ String tenantId = tenant == null ? null : tenant.getString();
+ SequenceKey key = MetaDataUtil.getOldViewIndexSequenceKey(tenantId,
viewIndexTable, 0, isNamespaceMapped);
+ //Sequences are owned globally even if they contain a tenantId in the
name
+ String sequenceTenantId = isNamespaceMapped ? tenantId : null;
+ cqs.createSequence(sequenceTenantId, key.getSchemaName(),
key.getSequenceName(),
+ Long.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false,
EnvironmentEdgeManager.currentTimeMillis());
+ return key;
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
index 4de7034..3126ee4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java
@@ -153,15 +153,9 @@ public class ViewIndexIT extends SplitSystemCatalogIT {
conn2.createStatement().executeQuery("SELECT * FROM " +
fullTableName).next();
String sequenceName =
getViewIndexSequenceName(PNameFactory.newName(fullTableName), null,
isNamespaceMapped);
String sequenceSchemaName =
getViewIndexSequenceSchemaName(PNameFactory.newName(fullTableName),
isNamespaceMapped);
- String seqName =
getViewIndexSequenceName(PNameFactory.newName(fullTableName), null,
!isNamespaceMapped);
- String seqSchemaName =
getViewIndexSequenceSchemaName(PNameFactory.newName(fullTableName),
!isNamespaceMapped);
- verifySequenceValue(null, sequenceName, sequenceSchemaName,
-9223372036854775807L);
- verifySequenceValue(null, sequenceName, sequenceSchemaName,
-9223372036854775807L);
+ verifySequenceValue(null, sequenceName, sequenceSchemaName,
Long.MIN_VALUE + 1);
conn1.createStatement().execute("CREATE INDEX " + indexName + "_2 ON "
+ fullViewName + " (v1)");
- verifySequenceValue(null, sequenceName, sequenceSchemaName,
-9223372036854775806L);
- // Check other format of sequence is not there as Sequences format is
different for views/indexes created on
- // table which are namespace mapped and which are not.
- verifySequenceNotExists(null, seqName, seqSchemaName);
+ verifySequenceValue(null, sequenceName, sequenceSchemaName,
Long.MIN_VALUE + 2);
conn1.createStatement().execute("DROP VIEW " + fullViewName);
conn1.createStatement().execute("DROP TABLE "+ fullTableName);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 141f637..fa374a7 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2347,26 +2347,7 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
String tenantIdStr = tenantIdBytes.length == 0 ? null :
Bytes.toString(tenantIdBytes);
try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
{
PName physicalName = parentTable.getPhysicalName();
- int nSequenceSaltBuckets =
connection.getQueryServices().getSequenceSaltBuckets();
- SequenceKey key =
MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
- nSequenceSaltBuckets,
parentTable.isNamespaceMapped() );
- // TODO Review Earlier sequence was created at
(SCN-1/LATEST_TIMESTAMP) and incremented at the client
max(SCN,dataTable.getTimestamp), but it seems we should
- // use always LATEST_TIMESTAMP to avoid seeing wrong
sequence values by different connection having SCN
- // or not.
- long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
- try {
-
connection.getQueryServices().createSequence(key.getTenantId(),
key.getSchemaName(), key.getSequenceName(),
- Long.MIN_VALUE, 1, 1, Long.MIN_VALUE,
Long.MAX_VALUE, false, sequenceTimestamp);
- } catch (SequenceAlreadyExistsException e) {
- }
- long[] seqValues = new long[1];
- SQLException[] sqlExceptions = new SQLException[1];
-
connection.getQueryServices().incrementSequences(Collections.singletonList(new
SequenceAllocation(key, 1)),
- HConstants.LATEST_TIMESTAMP, seqValues,
sqlExceptions);
- if (sqlExceptions[0] != null) {
- throw sqlExceptions[0];
- }
- long seqValue = seqValues[0];
+ long seqValue = getViewIndexSequenceValue(connection,
tenantIdStr, parentTable, physicalName);
Put tableHeaderPut =
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
NavigableMap<byte[], List<Cell>> familyCellMap =
tableHeaderPut.getFamilyCellMap();
@@ -2481,6 +2462,33 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
}
}
+ private long getViewIndexSequenceValue(PhoenixConnection connection,
String tenantIdStr, PTable parentTable, PName physicalName) throws SQLException
{
+ int nSequenceSaltBuckets =
connection.getQueryServices().getSequenceSaltBuckets();
+
+ SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr,
physicalName,
+ nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
+ // Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and
incremented at the client max(SCN,dataTable.getTimestamp), but it seems we
should
+ // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values
by different connection having SCN
+ // or not.
+ long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
+ try {
+ connection.getQueryServices().createSequence(key.getTenantId(),
key.getSchemaName(), key.getSequenceName(),
+ Long.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false,
sequenceTimestamp);
+ } catch (SequenceAlreadyExistsException e) {
+ //someone else got here first and created the sequence, or it was
pre-existing. Not a problem.
+ }
+
+
+ long[] seqValues = new long[1];
+ SQLException[] sqlExceptions = new SQLException[1];
+
connection.getQueryServices().incrementSequences(Collections.singletonList(new
SequenceAllocation(key, 1)),
+ HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
+ if (sqlExceptions[0] != null) {
+ throw sqlExceptions[0];
+ }
+ return seqValues[0];
+ }
+
public static void dropChildViews(RegionCoprocessorEnvironment env, byte[]
tenantIdBytes, byte[] schemaName, byte[] tableName)
throws IOException, SQLException, ClassNotFoundException {
Table hTable =
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 394b6ea..1761ec4 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -71,7 +71,9 @@ import static
org.apache.phoenix.util.UpgradeUtil.syncTableAndIndexProperties;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
@@ -225,6 +227,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SequenceAllocation;
+import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.SystemFunctionSplitPolicy;
@@ -3418,6 +3421,9 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// See PHOENIX-3955
if (currentServerSideTableTimeStamp <
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) {
syncTableAndIndexProperties(metaConnection, getAdmin());
+ //Combine view index id sequences for the same physical
view index table
+ //to avoid collisions. See PHOENIX-5132 and PHOENIX-5138
+ UpgradeUtil.mergeViewIndexIdSequences(this,
metaConnection);
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 1a911c8..44624c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -616,13 +616,39 @@ public class MetaDataUtil {
}
}
- public static String getViewIndexSequenceSchemaName(PName physicalName,
boolean isNamespaceMapped) {
+ public static String getOldViewIndexSequenceSchemaName(PName physicalName,
boolean isNamespaceMapped) {
if (!isNamespaceMapped) { return VIEW_INDEX_SEQUENCE_PREFIX +
physicalName.getString(); }
return SchemaUtil.getSchemaNameFromFullName(physicalName.toString());
}
+ public static String getOldViewIndexSequenceName(PName physicalName, PName
tenantId, boolean isNamespaceMapped) {
+ if (!isNamespaceMapped) { return VIEW_INDEX_SEQUENCE_NAME_PREFIX +
(tenantId == null ? "" : tenantId); }
+ return SchemaUtil.getTableNameFromFullName(physicalName.toString()) +
VIEW_INDEX_SEQUENCE_NAME_PREFIX;
+ }
+
+ public static SequenceKey getOldViewIndexSequenceKey(String tenantId,
PName physicalName, int nSaltBuckets,
+ boolean
isNamespaceMapped) {
+ // Create global sequence of the form: <prefixed base table
name><tenant id>
+ // rather than tenant-specific sequence, as it makes it much easier
+ // to cleanup when the physical table is dropped, as we can delete
+ // all global sequences leading with <prefix> + physical name.
+ String schemaName = getOldViewIndexSequenceSchemaName(physicalName,
isNamespaceMapped);
+ String tableName = getOldViewIndexSequenceName(physicalName,
PNameFactory.newName(tenantId), isNamespaceMapped);
+ return new SequenceKey(isNamespaceMapped ? tenantId : null,
schemaName, tableName, nSaltBuckets);
+ }
+
+ public static String getViewIndexSequenceSchemaName(PName physicalName,
boolean isNamespaceMapped) {
+ if (!isNamespaceMapped) {
+ String baseTableName =
SchemaUtil.getParentTableNameFromIndexTable(physicalName.getString(),
+ MetaDataUtil.VIEW_INDEX_TABLE_PREFIX);
+ return SchemaUtil.getSchemaNameFromFullName(baseTableName);
+ } else {
+ return
SchemaUtil.getSchemaNameFromFullName(physicalName.toString());
+ }
+
+ }
+
public static String getViewIndexSequenceName(PName physicalName, PName
tenantId, boolean isNamespaceMapped) {
- if (!isNamespaceMapped) { return VIEW_INDEX_SEQUENCE_NAME_PREFIX; }
return SchemaUtil.getTableNameFromFullName(physicalName.toString()) +
VIEW_INDEX_SEQUENCE_NAME_PREFIX;
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 8d0aca1..c859bf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -36,11 +36,13 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_CAT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
@@ -53,6 +55,7 @@ import static
org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_
import java.io.IOException;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -106,6 +109,9 @@ import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.SequenceAllocation;
+import org.apache.phoenix.schema.SequenceAlreadyExistsException;
+import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PBinary;
@@ -2230,15 +2236,13 @@ public class UpgradeUtil {
String newSchemaName =
MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
String newSequenceName =
MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
// create new entry with new schema format
- String upsert = "UPSERT INTO " +
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " SELECT REGEXP_SPLIT("
- + PhoenixDatabaseMetaData.SEQUENCE_NAME + ",'_')[3] ,\'" +
newSchemaName + "\',\'" + newSequenceName
+ String upsert = "UPSERT INTO " +
PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " SELECT NULL,\'" + newSchemaName +
+ "\',\'" + newSequenceName
+ "\'," + START_WITH + "," + CURRENT_VALUE + "," +
INCREMENT_BY + "," + CACHE_SIZE + "," + MIN_VALUE
+ "," + MAX_VALUE + "," + CYCLE_FLAG + "," +
LIMIT_REACHED_FLAG + " FROM "
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " +
PhoenixDatabaseMetaData.TENANT_ID
+ " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA +
" = '" + oldSchemaName + "'";
connection.createStatement().executeUpdate(upsert);
- // delete old sequence
- MetaDataUtil.deleteViewIndexSequences(connection, oldPhysicalName,
false);
}
private static void updateLink(PhoenixConnection conn, String srcTableName,
@@ -2297,6 +2301,106 @@ public class UpgradeUtil {
}
}
+ public static void mergeViewIndexIdSequences(ConnectionQueryServices cqs,
PhoenixConnection metaConnection)
+ throws SQLException{
+ /* before PHOENIX-5132, there was a per-tenant sequence to generate
view index ids,
+ which could cause problems if global and tenant-owned view indexes
were mixed for the
+ same physical base table. Now there's just one sequence for all
view indexes of the same
+ physical table, but we have to check to see if there are any legacy
sequences, and
+ merge them into a single sequence equal to max + 101 (for a safety
margin)
+ of the largest legacy sequence to avoid collisons.
+ */
+ //map of physical table names to view index sequences
+ Map<String, List<SequenceKey>> sequenceTableMap = new HashMap<>();
+ DatabaseMetaData metaData = metaConnection.getMetaData();
+
+ try (ResultSet sequenceRS = metaData.getTables(null, null,
+ "%" + MetaDataUtil.VIEW_INDEX_SEQUENCE_NAME_PREFIX + "%",
+ new String[] {PhoenixDatabaseMetaData.SEQUENCE_TABLE_TYPE})) {
+ while (sequenceRS.next()) {
+ String tenantId = sequenceRS.getString(TABLE_CAT);
+ String schemaName = sequenceRS.getString(TABLE_SCHEM);
+ String sequenceName = sequenceRS.getString(TABLE_NAME);
+ int numBuckets = sequenceRS.getInt(SALT_BUCKETS);
+ SequenceKey key = new SequenceKey(tenantId, schemaName,
sequenceName, numBuckets);
+ String baseTableName;
+ //under the old naming convention, view index sequences
+ // of non-namespace mapped tables stored their physical table
name in the sequence schema for
+ //some reason. Namespace-mapped tables stored it in the
sequence name itself.
+ //Note the difference between VIEW_INDEX_SEQUENCE_PREFIX
(_SEQ_)
+ //and VIEW_INDEX_SEQUENCE_NAME_PREFIX (_ID_)
+ if (schemaName != null &&
schemaName.contains(MetaDataUtil.VIEW_INDEX_SEQUENCE_PREFIX)) {
+ baseTableName =
schemaName.replace(MetaDataUtil.VIEW_INDEX_SEQUENCE_PREFIX, "");
+ } else {
+ baseTableName = SchemaUtil.getTableName(schemaName,
+
sequenceName.replace(MetaDataUtil.VIEW_INDEX_SEQUENCE_NAME_PREFIX, ""));
+ }
+ if (!sequenceTableMap.containsKey(baseTableName)) {
+ sequenceTableMap.put(baseTableName, new
ArrayList<SequenceKey>());
+ }
+ sequenceTableMap.get(baseTableName).add(key);
+ }
+ }
+ for (String baseTableName : sequenceTableMap.keySet()){
+ Map<SequenceKey, Long> currentSequenceValues = new
HashMap<SequenceKey, Long>();
+ long maxViewIndexId = Long.MIN_VALUE;
+ PName name = PNameFactory.newName(baseTableName);
+ boolean hasNamespaceMapping =
+ SchemaUtil.isNamespaceMappingEnabled(null,
cqs.getConfiguration()) ||
+
cqs.getProps().getBoolean(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, false);
+ List<SequenceKey> existingSequenceKeys =
sequenceTableMap.get(baseTableName);
+ for (SequenceKey sequenceKey : existingSequenceKeys){
+ long[] currentValueArray = new long[1];
+ SQLException[] sqlExceptions = new SQLException[1];
+ cqs.incrementSequences(
+ Lists.newArrayList(new SequenceAllocation(sequenceKey,
1L)),
+ EnvironmentEdgeManager.currentTimeMillis(),
+ currentValueArray, new SQLException[1]);
+
+ if (sqlExceptions[0] != null) {
+ logger.error("Unable to convert view index sequence
because of error. " +
+ "It will need to be converted manually, " +
+ " or there's a risk that two view indexes of the same
base table " +
+ "will have colliding view index ids.",
sqlExceptions[0]);
+ continue;
+ }
+ if (currentValueArray[0] > maxViewIndexId){
+ maxViewIndexId = currentValueArray[0];
+ }
+ currentSequenceValues.put(sequenceKey, currentValueArray[0]);
+ }
+ //just in case someone is creating a view index RIGHT NOW,
increment maxViewIndexId
+ //by 100 to make very sure there are no collisions
+ maxViewIndexId += 100;
+ try {
+ //In one case (namespaced-mapped base table, global view
index), the new sequence
+ //is the same as the old sequence, so rather than create it we
just increment it
+ //to the right value.
+ SequenceKey newSequenceKey = new SequenceKey(null,
MetaDataUtil.getViewIndexSequenceSchemaName(name, hasNamespaceMapping),
+ MetaDataUtil.getViewIndexSequenceName(name, null,
hasNamespaceMapping), cqs.getSequenceSaltBuckets());
+ if (currentSequenceValues.containsKey(newSequenceKey)){
+ long incrementValue = maxViewIndexId -
currentSequenceValues.get(newSequenceKey);
+ SQLException[] incrementExceptions = new SQLException[1];
+ List<SequenceAllocation> incrementAllocations =
Lists.newArrayList(new SequenceAllocation(newSequenceKey, incrementValue));
+ cqs.incrementSequences(incrementAllocations,
EnvironmentEdgeManager.currentTimeMillis(),
+ new long[1], incrementExceptions);
+ if (incrementExceptions[0] != null){
+ throw incrementExceptions[0];
+ }
+ } else {
+ cqs.createSequence(null, newSequenceKey.getSchemaName(),
+ newSequenceKey.getSequenceName(), maxViewIndexId, 1, 1,
+ Long.MIN_VALUE, Long.MAX_VALUE,
+ false, EnvironmentEdgeManager.currentTimeMillis());
+ }
+ } catch(SequenceAlreadyExistsException sae) {
+ logger.info("Tried to create view index sequence "
+ + SchemaUtil.getTableName(sae.getSchemaName(),
sae.getSequenceName()) +
+ " during upgrade but it already existed. This is probably
fine.");
+ }
+ }
+ }
+
public static final String getSysCatalogSnapshotName(long
currentSystemTableTimestamp) {
String tableString = SYSTEM_CATALOG_NAME;
Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");